From fe89373d5265a9c41bba75408506a75101e50694 Mon Sep 17 00:00:00 2001 From: Mohan Date: Wed, 28 Jan 2026 17:16:19 +0530 Subject: [PATCH 01/27] the first version of the perf protocol --- examples/perf/perf_example.py | 162 ++++++++++++++++++++ libp2p/perf/__init__.py | 40 +++++ libp2p/perf/constants.py | 8 + libp2p/perf/index.py | 108 +++++++++++++ libp2p/perf/perf_service.py | 279 ++++++++++++++++++++++++++++++++++ 5 files changed, 597 insertions(+) create mode 100644 examples/perf/perf_example.py create mode 100644 libp2p/perf/__init__.py create mode 100644 libp2p/perf/constants.py create mode 100644 libp2p/perf/index.py create mode 100644 libp2p/perf/perf_service.py diff --git a/examples/perf/perf_example.py b/examples/perf/perf_example.py new file mode 100644 index 000000000..1047a1e40 --- /dev/null +++ b/examples/perf/perf_example.py @@ -0,0 +1,162 @@ +""" +Perf protocol example - Measure transfer performance between two libp2p nodes. + +Usage: + # Terminal 1 - Run the server (listener) + python perf_example.py -p 8000 + + # Terminal 2 - Run the client (measures performance to server) + python perf_example.py -p 8001 -d /ip4/127.0.0.1/tcp/8000/p2p/ +""" + +import argparse +import logging + +import multiaddr +import trio + +from libp2p import new_host +from libp2p.peer.peerinfo import info_from_p2p_addr +from libp2p.perf import PerfService, PROTOCOL_NAME + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Default transfer sizes (reduced for testing due to noise protocol message size limit) +# Note: Noise protocol limits messages to 65535 bytes +ONE_UNIT = 16 * 16 # 256 bytes +UPLOAD_BYTES = ONE_UNIT * 10 # 2560 bytes upload +DOWNLOAD_BYTES = ONE_UNIT * 10 # 2560 bytes download + + +async def run_server(host, perf_service) -> None: + """Run as a perf server - listens for incoming perf requests.""" + await perf_service.start() + + print(f"\nPerf server ready, listening on:") + for addr in host.get_addrs(): + print(f" {addr}") + + print(f"\nProtocol: {PROTOCOL_NAME}") + print("\nRun client with:") + print(f" python perf_example.py -d {host.get_addrs()[0]}") + print("\nWaiting for incoming perf requests...") + + await trio.sleep_forever() + + +async def run_client( + host, perf_service, destination: str, upload_bytes: int, download_bytes: int +) -> None: + """Run as a perf client - measures performance to a remote peer.""" + await perf_service.start() + + maddr = multiaddr.Multiaddr(destination) + info = info_from_p2p_addr(maddr) + + print(f"\nConnecting to {info.peer_id}...") + await host.connect(info) + print(f"Connected!") + + print(f"\nMeasuring performance:") + print(f" Upload: {upload_bytes} bytes") + print(f" Download: {download_bytes} bytes") + print() + + async for output in perf_service.measure_performance( + maddr, upload_bytes, download_bytes + ): + if output["type"] == "intermediary": + # Progress report + upload_bytes_out = output["upload_bytes"] + download_bytes_out = output["download_bytes"] + time_s = output["time_seconds"] + + if upload_bytes_out > 0: + throughput = upload_bytes_out / time_s if time_s > 0 else 0 + print(f" Uploading: {upload_bytes_out} bytes in {time_s:.2f}s ({throughput:.0f} bytes/s)") + elif download_bytes_out > 0: + throughput = download_bytes_out / time_s if time_s > 0 else 0 + print(f" Downloading: {download_bytes_out} bytes in {time_s:.2f}s ({throughput:.0f} bytes/s)") + + elif output["type"] == "final": + # Final summary + total_time = output["time_seconds"] + total_upload = output["upload_bytes"] + total_download = output["download_bytes"] + total_data = total_upload + total_download + + print(f"\n{'='*50}") + print(f"Performance Results:") + print(f" Total time: {total_time:.3f} seconds") + print(f" Uploaded: {total_upload} bytes") + print(f" Downloaded: {total_download} bytes") + print(f" Total data: {total_data} bytes") + print(f" Throughput: {total_data / total_time:.0f} bytes/s") + print(f"{'='*50}") + + await perf_service.stop() + + +async def run(port: int, destination: str, upload_mb: int, download_mb: int) -> None: + """Main run function.""" + from libp2p.utils.address_validation import find_free_port + + if port <= 0: + port = find_free_port() + + listen_addrs = [multiaddr.Multiaddr(f"/ip4/127.0.0.1/tcp/{port}")] + host = new_host(listen_addrs=listen_addrs) + + # Create perf service + perf_service = PerfService(host) + + async with host.run(listen_addrs=listen_addrs): + if destination: + # Client mode + await run_client( + host, + perf_service, + destination, + upload_mb * ONE_UNIT, + download_mb * ONE_UNIT, + ) + else: + # Server mode + await run_server(host, perf_service) + + +def main() -> None: + description = """ + Perf protocol example - Measure transfer performance between libp2p nodes. + + To use: + 1. Start server: python perf_example.py -p 8000 + 2. Start client: python perf_example.py -d + """ + + parser = argparse.ArgumentParser(description=description) + parser.add_argument("-p", "--port", default=0, type=int, help="listening port") + parser.add_argument("-d", "--destination", type=str, help="destination multiaddr") + parser.add_argument( + "-u", "--upload", default=10, type=int, help="upload size in units of 256 bytes (default: 10)" + ) + parser.add_argument( + "-D", + "--download", + default=10, + type=int, + help="download size in units of 256 bytes (default: 10)", + ) + + args = parser.parse_args() + + try: + trio.run(run, args.port, args.destination, args.upload, args.download) + except KeyboardInterrupt: + print("\nShutting down...") + + +if __name__ == "__main__": + main() diff --git a/libp2p/perf/__init__.py b/libp2p/perf/__init__.py new file mode 100644 index 000000000..2eba91a41 --- /dev/null +++ b/libp2p/perf/__init__.py @@ -0,0 +1,40 @@ +""" +Perf protocol for measuring transfer performance. + +This module implements the libp2p perf protocol as specified in: +https://github.com/libp2p/specs/blob/master/perf/perf.md +""" + +from .constants import ( + MAX_INBOUND_STREAMS, + MAX_OUTBOUND_STREAMS, + PROTOCOL_NAME, + RUN_ON_LIMITED_CONNECTION, + WRITE_BLOCK_SIZE, +) +from .index import ( + IPerf, + PerfComponents, + PerfInit, + PerfOptions, + PerfOutput, +) +from .perf_service import PerfService + +__all__ = [ + # Constants + "PROTOCOL_NAME", + "WRITE_BLOCK_SIZE", + "MAX_INBOUND_STREAMS", + "MAX_OUTBOUND_STREAMS", + "RUN_ON_LIMITED_CONNECTION", + # Types + "PerfOutput", + "PerfInit", + "PerfOptions", + "PerfComponents", + # Interface + "IPerf", + # Implementation + "PerfService", +] diff --git a/libp2p/perf/constants.py b/libp2p/perf/constants.py new file mode 100644 index 000000000..fb63c1f30 --- /dev/null +++ b/libp2p/perf/constants.py @@ -0,0 +1,8 @@ +# Protocol constants for the perf protocol +# https://github.com/libp2p/specs/blob/master/perf/perf.md + +PROTOCOL_NAME = "/perf/1.0.0" +WRITE_BLOCK_SIZE = 64 << 10 # 64KB (65536 bytes) +MAX_INBOUND_STREAMS = 1 +MAX_OUTBOUND_STREAMS = 1 +RUN_ON_LIMITED_CONNECTION = False diff --git a/libp2p/perf/index.py b/libp2p/perf/index.py new file mode 100644 index 000000000..277e47de9 --- /dev/null +++ b/libp2p/perf/index.py @@ -0,0 +1,108 @@ +""" +Perf protocol interfaces and types. + +The perf protocol is used to measure transfer performance within and across +libp2p implementations. + +Spec: https://github.com/libp2p/specs/blob/master/perf/perf.md +""" + +from abc import ABC, abstractmethod +from typing import ( + TYPE_CHECKING, + AsyncIterator, + Literal, + Optional, + TypedDict, +) + +if TYPE_CHECKING: + from multiaddr import Multiaddr + + from libp2p.abc import IHost + + +# ------------------------------- Types ------------------------------- + + +class PerfOutput(TypedDict): + """Output data from a performance measurement.""" + + type: Literal["connection", "stream", "intermediary", "final"] + time_seconds: float + upload_bytes: int + download_bytes: int + + +class PerfInit(TypedDict, total=False): + """Initialization options for the perf service.""" + + protocol_name: str + max_inbound_streams: int + max_outbound_streams: int + run_on_limited_connection: bool + write_block_size: int # Default: 65536 (64KB) + + +class PerfOptions(TypedDict, total=False): + """Options for a performance measurement run.""" + + reuse_existing_connection: bool # Default: False + + +class PerfComponents(TypedDict): + """Components required by the perf service.""" + + host: "IHost" + + +# ------------------------------- Interface ------------------------------- + + +class IPerf(ABC): + """Interface for the perf protocol service.""" + + @abstractmethod + async def start(self) -> None: + """Start the perf service and register the protocol handler.""" + ... + + @abstractmethod + async def stop(self) -> None: + """Stop the perf service and unregister the protocol handler.""" + ... + + @abstractmethod + def is_started(self) -> bool: + """Check if the service is currently running.""" + ... + + @abstractmethod + def measure_performance( + self, + multiaddr: "Multiaddr", + send_bytes: int, + recv_bytes: int, + options: Optional[PerfOptions] = None, + ) -> AsyncIterator[PerfOutput]: + """ + Measure transfer performance to a remote peer. + + Parameters + ---------- + multiaddr : Multiaddr + The address of the remote peer to test against. + send_bytes : int + Number of bytes to upload to the remote peer. + recv_bytes : int + Number of bytes to request the remote peer to send back. + options : PerfOptions, optional + Options for the performance run. + + Yields + ------ + PerfOutput + Progress reports during the transfer, with a final summary at the end. + + """ + ... diff --git a/libp2p/perf/perf_service.py b/libp2p/perf/perf_service.py new file mode 100644 index 000000000..389b658f9 --- /dev/null +++ b/libp2p/perf/perf_service.py @@ -0,0 +1,279 @@ +""" +Perf protocol service implementation. + +This module implements the perf protocol for measuring transfer performance. +""" + +import logging +import struct +import time +from typing import AsyncIterator, Optional + +from multiaddr import Multiaddr + +from libp2p.abc import IHost, INetStream +from libp2p.custom_types import TProtocol +from libp2p.peer.id import ID as PeerID +from libp2p.peer.peerinfo import PeerInfo + +from .constants import ( + MAX_INBOUND_STREAMS, + MAX_OUTBOUND_STREAMS, + PROTOCOL_NAME, + RUN_ON_LIMITED_CONNECTION, + WRITE_BLOCK_SIZE, +) +from .index import IPerf, PerfInit, PerfOptions, PerfOutput + +logger = logging.getLogger(__name__) + + +class PerfService(IPerf): + """ + Implementation of the perf protocol. + + The perf protocol is used to measure transfer performance within and across + libp2p implementations. + """ + + def __init__(self, host: IHost, init: Optional[PerfInit] = None) -> None: + """ + Initialize the PerfService. + + Parameters + ---------- + host : IHost + The libp2p host instance. + init : PerfInit, optional + Initialization options for the service. + + """ + if init is None: + init = {} + + self._host = host + self._started = False + self._protocol = TProtocol(init.get("protocol_name", PROTOCOL_NAME)) + self._write_block_size = init.get("write_block_size", WRITE_BLOCK_SIZE) + self._max_inbound_streams = init.get("max_inbound_streams", MAX_INBOUND_STREAMS) + self._max_outbound_streams = init.get( + "max_outbound_streams", MAX_OUTBOUND_STREAMS + ) + self._run_on_limited_connection = init.get( + "run_on_limited_connection", RUN_ON_LIMITED_CONNECTION + ) + + # Pre-allocate buffer for sending data + self._buf = bytes(self._write_block_size) + + async def start(self) -> None: + """Start the perf service and register the protocol handler.""" + self._host.set_stream_handler(self._protocol, self._handle_message) + self._started = True + logger.debug("Perf service started with protocol %s", self._protocol) + + async def stop(self) -> None: + """Stop the perf service and unregister the protocol handler.""" + # Note: py-libp2p may not have unregister, but we set the flag + self._started = False + logger.debug("Perf service stopped") + + def is_started(self) -> bool: + """Check if the service is currently running.""" + return self._started + + async def _handle_message(self, stream: INetStream) -> None: + """ + Handle incoming perf protocol messages (server side). + + Reads data from the stream, extracts the number of bytes to send back + from the first 8 bytes, then sends that many bytes back. + """ + try: + bytes_to_send_back: Optional[int] = None + + # Read all incoming data + while True: + try: + data = await stream.read(self._write_block_size) + if not data: + break + + # First 8 bytes contain the number of bytes to send back + if bytes_to_send_back is None and len(data) >= 8: + # Big-endian unsigned 64-bit integer + bytes_to_send_back = struct.unpack(">Q", data[:8])[0] + logger.debug("Received request to send back %d bytes", bytes_to_send_back) + except Exception: + break + + if bytes_to_send_back is None: + logger.error("bytes_to_send_back was not set") + await stream.reset() + return + + # Send back the requested number of bytes + while bytes_to_send_back > 0: + to_send = min(self._write_block_size, bytes_to_send_back) + await stream.write(self._buf[:to_send]) + bytes_to_send_back -= to_send + + await stream.close() + + except Exception as e: + logger.error("Error handling perf message: %s", e) + await stream.reset() + + async def measure_performance( + self, + multiaddr: Multiaddr, + send_bytes: int, + recv_bytes: int, + options: Optional[PerfOptions] = None, + ) -> AsyncIterator[PerfOutput]: + """ + Measure transfer performance to a remote peer. + + Parameters + ---------- + multiaddr : Multiaddr + The address of the remote peer to test against. + send_bytes : int + Number of bytes to upload to the remote peer. + recv_bytes : int + Number of bytes to request the remote peer to send back. + options : PerfOptions, optional + Options for the performance run. + + Yields + ------ + PerfOutput + Progress reports during the transfer, with a final summary at the end. + + """ + if options is None: + options = {} + + initial_start_time = time.time() + last_reported_time = time.time() + + # Extract peer ID from multiaddr and connect + peer_id = PeerID.from_base58(str(multiaddr).split("/p2p/")[-1]) + peer_info = PeerInfo(peer_id, [multiaddr]) + + # Connect to the peer + await self._host.connect(peer_info) + logger.debug( + "Opened connection after %.3f ms", + (time.time() - last_reported_time) * 1000, + ) + last_reported_time = time.time() + + # Open a new stream + stream = await self._host.new_stream(peer_id, [self._protocol]) + logger.debug( + "Opened stream after %.3f ms", + (time.time() - last_reported_time) * 1000, + ) + last_reported_time = time.time() + + last_amount_of_bytes_sent = 0 + total_bytes_sent = 0 + upload_start = time.time() + + try: + # Send the number of bytes we want to receive (as big-endian u64) + header = struct.pack(">Q", recv_bytes) + await stream.write(header) + + logger.debug("Sending %d bytes to %s", send_bytes, peer_id) + + # Upload phase + bytes_remaining = send_bytes + while bytes_remaining > 0: + to_send = min(self._write_block_size, bytes_remaining) + await stream.write(self._buf[:to_send]) + bytes_remaining -= to_send + last_amount_of_bytes_sent += to_send + total_bytes_sent += to_send + + # Yield intermediary progress every second + if time.time() - last_reported_time > 1.0: + yield PerfOutput( + type="intermediary", + time_seconds=time.time() - last_reported_time, + upload_bytes=last_amount_of_bytes_sent, + download_bytes=0, + ) + last_reported_time = time.time() + last_amount_of_bytes_sent = 0 + + logger.debug( + "Upload complete after %.3f ms", (time.time() - upload_start) * 1000 + ) + + # Close the write side to signal we're done sending + # Note: close_write() is available on NetStream but not on INetStream interface + if hasattr(stream, "close_write"): + await stream.close_write() # type: ignore[attr-defined] + + # Download phase + last_amount_of_bytes_received = 0 + last_reported_time = time.time() + total_bytes_received = 0 + download_start = time.time() + + while total_bytes_received < recv_bytes: + try: + data = await stream.read(self._write_block_size) + if not data: + break + + last_amount_of_bytes_received += len(data) + total_bytes_received += len(data) + + # Yield intermediary progress every second + if time.time() - last_reported_time > 1.0: + yield PerfOutput( + type="intermediary", + time_seconds=time.time() - last_reported_time, + upload_bytes=0, + download_bytes=last_amount_of_bytes_received, + ) + last_reported_time = time.time() + last_amount_of_bytes_received = 0 + except Exception: + break + + logger.debug( + "Download complete after %.3f ms", (time.time() - download_start) * 1000 + ) + + if total_bytes_received != recv_bytes: + raise ValueError( + f"Expected to receive {recv_bytes} bytes, " + f"but received {total_bytes_received}" + ) + + # Yield final result + yield PerfOutput( + type="final", + time_seconds=time.time() - initial_start_time, + upload_bytes=total_bytes_sent, + download_bytes=total_bytes_received, + ) + + logger.debug("Performed %s to %s", self._protocol, peer_id) + + except Exception as e: + logger.error( + "Error sending %d/%d bytes to %s: %s", + total_bytes_sent, + send_bytes, + peer_id, + e, + ) + await stream.reset() + raise + finally: + await stream.close() From ba19233929fccad8e03d3a06292ca76a790185cd Mon Sep 17 00:00:00 2001 From: Mohan Date: Wed, 28 Jan 2026 17:16:53 +0530 Subject: [PATCH 02/27] fixed the lint errors --- examples/perf/perf_example.py | 28 ++++++++++++++++++---------- libp2p/perf/index.py | 5 ++--- libp2p/perf/perf_service.py | 12 +++++++----- 3 files changed, 27 insertions(+), 18 deletions(-) diff --git a/examples/perf/perf_example.py b/examples/perf/perf_example.py index 1047a1e40..4a59af505 100644 --- a/examples/perf/perf_example.py +++ b/examples/perf/perf_example.py @@ -17,7 +17,7 @@ from libp2p import new_host from libp2p.peer.peerinfo import info_from_p2p_addr -from libp2p.perf import PerfService, PROTOCOL_NAME +from libp2p.perf import PROTOCOL_NAME, PerfService # Configure logging logging.basicConfig(level=logging.INFO) @@ -34,7 +34,7 @@ async def run_server(host, perf_service) -> None: """Run as a perf server - listens for incoming perf requests.""" await perf_service.start() - print(f"\nPerf server ready, listening on:") + print("\nPerf server ready, listening on:") for addr in host.get_addrs(): print(f" {addr}") @@ -57,9 +57,9 @@ async def run_client( print(f"\nConnecting to {info.peer_id}...") await host.connect(info) - print(f"Connected!") + print("Connected!") - print(f"\nMeasuring performance:") + print("\nMeasuring performance:") print(f" Upload: {upload_bytes} bytes") print(f" Download: {download_bytes} bytes") print() @@ -75,10 +75,14 @@ async def run_client( if upload_bytes_out > 0: throughput = upload_bytes_out / time_s if time_s > 0 else 0 - print(f" Uploading: {upload_bytes_out} bytes in {time_s:.2f}s ({throughput:.0f} bytes/s)") + print( + f" Uploading: {upload_bytes_out} bytes in {time_s:.2f}s ({throughput:.0f} bytes/s)" + ) elif download_bytes_out > 0: throughput = download_bytes_out / time_s if time_s > 0 else 0 - print(f" Downloading: {download_bytes_out} bytes in {time_s:.2f}s ({throughput:.0f} bytes/s)") + print( + f" Downloading: {download_bytes_out} bytes in {time_s:.2f}s ({throughput:.0f} bytes/s)" + ) elif output["type"] == "final": # Final summary @@ -87,14 +91,14 @@ async def run_client( total_download = output["download_bytes"] total_data = total_upload + total_download - print(f"\n{'='*50}") - print(f"Performance Results:") + print(f"\n{'=' * 50}") + print("Performance Results:") print(f" Total time: {total_time:.3f} seconds") print(f" Uploaded: {total_upload} bytes") print(f" Downloaded: {total_download} bytes") print(f" Total data: {total_data} bytes") print(f" Throughput: {total_data / total_time:.0f} bytes/s") - print(f"{'='*50}") + print(f"{'=' * 50}") await perf_service.stop() @@ -140,7 +144,11 @@ def main() -> None: parser.add_argument("-p", "--port", default=0, type=int, help="listening port") parser.add_argument("-d", "--destination", type=str, help="destination multiaddr") parser.add_argument( - "-u", "--upload", default=10, type=int, help="upload size in units of 256 bytes (default: 10)" + "-u", + "--upload", + default=10, + type=int, + help="upload size in units of 256 bytes (default: 10)", ) parser.add_argument( "-D", diff --git a/libp2p/perf/index.py b/libp2p/perf/index.py index 277e47de9..827652432 100644 --- a/libp2p/perf/index.py +++ b/libp2p/perf/index.py @@ -8,11 +8,10 @@ """ from abc import ABC, abstractmethod +from collections.abc import AsyncIterator from typing import ( TYPE_CHECKING, - AsyncIterator, Literal, - Optional, TypedDict, ) @@ -83,7 +82,7 @@ def measure_performance( multiaddr: "Multiaddr", send_bytes: int, recv_bytes: int, - options: Optional[PerfOptions] = None, + options: PerfOptions | None = None, ) -> AsyncIterator[PerfOutput]: """ Measure transfer performance to a remote peer. diff --git a/libp2p/perf/perf_service.py b/libp2p/perf/perf_service.py index 389b658f9..26ef7b70f 100644 --- a/libp2p/perf/perf_service.py +++ b/libp2p/perf/perf_service.py @@ -4,10 +4,10 @@ This module implements the perf protocol for measuring transfer performance. """ +from collections.abc import AsyncIterator import logging import struct import time -from typing import AsyncIterator, Optional from multiaddr import Multiaddr @@ -36,7 +36,7 @@ class PerfService(IPerf): libp2p implementations. """ - def __init__(self, host: IHost, init: Optional[PerfInit] = None) -> None: + def __init__(self, host: IHost, init: PerfInit | None = None) -> None: """ Initialize the PerfService. @@ -90,7 +90,7 @@ async def _handle_message(self, stream: INetStream) -> None: from the first 8 bytes, then sends that many bytes back. """ try: - bytes_to_send_back: Optional[int] = None + bytes_to_send_back: int | None = None # Read all incoming data while True: @@ -103,7 +103,9 @@ async def _handle_message(self, stream: INetStream) -> None: if bytes_to_send_back is None and len(data) >= 8: # Big-endian unsigned 64-bit integer bytes_to_send_back = struct.unpack(">Q", data[:8])[0] - logger.debug("Received request to send back %d bytes", bytes_to_send_back) + logger.debug( + "Received request to send back %d bytes", bytes_to_send_back + ) except Exception: break @@ -129,7 +131,7 @@ async def measure_performance( multiaddr: Multiaddr, send_bytes: int, recv_bytes: int, - options: Optional[PerfOptions] = None, + options: PerfOptions | None = None, ) -> AsyncIterator[PerfOutput]: """ Measure transfer performance to a remote peer. From a9d4848035c295e181452bc00be6a5b547d627b6 Mon Sep 17 00:00:00 2001 From: Mohan Date: Thu, 29 Jan 2026 14:23:09 +0530 Subject: [PATCH 03/27] changed the description --- libp2p/perf/index.py | 4 ---- libp2p/perf/perf_service.py | 3 +-- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/libp2p/perf/index.py b/libp2p/perf/index.py index 827652432..8359e98c1 100644 --- a/libp2p/perf/index.py +++ b/libp2p/perf/index.py @@ -1,9 +1,5 @@ """ Perf protocol interfaces and types. - -The perf protocol is used to measure transfer performance within and across -libp2p implementations. - Spec: https://github.com/libp2p/specs/blob/master/perf/perf.md """ diff --git a/libp2p/perf/perf_service.py b/libp2p/perf/perf_service.py index 26ef7b70f..d77567896 100644 --- a/libp2p/perf/perf_service.py +++ b/libp2p/perf/perf_service.py @@ -1,7 +1,6 @@ """ Perf protocol service implementation. - -This module implements the perf protocol for measuring transfer performance. +Spec: https://github.com/libp2p/specs/blob/master/perf/perf.md """ from collections.abc import AsyncIterator From d977f509dd2e513a94ca16fc4831f1087f245247 Mon Sep 17 00:00:00 2001 From: Mohan Date: Sat, 31 Jan 2026 13:06:57 +0530 Subject: [PATCH 04/27] fixed the yamux and noise header size errors --- examples/perf/perf_example.py | 7 ------- libp2p/perf/constants.py | 2 +- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/examples/perf/perf_example.py b/examples/perf/perf_example.py index 4a59af505..3387e421e 100644 --- a/examples/perf/perf_example.py +++ b/examples/perf/perf_example.py @@ -10,7 +10,6 @@ """ import argparse -import logging import multiaddr import trio @@ -19,12 +18,6 @@ from libp2p.peer.peerinfo import info_from_p2p_addr from libp2p.perf import PROTOCOL_NAME, PerfService -# Configure logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -# Default transfer sizes (reduced for testing due to noise protocol message size limit) -# Note: Noise protocol limits messages to 65535 bytes ONE_UNIT = 16 * 16 # 256 bytes UPLOAD_BYTES = ONE_UNIT * 10 # 2560 bytes upload DOWNLOAD_BYTES = ONE_UNIT * 10 # 2560 bytes download diff --git a/libp2p/perf/constants.py b/libp2p/perf/constants.py index fb63c1f30..f4068d43d 100644 --- a/libp2p/perf/constants.py +++ b/libp2p/perf/constants.py @@ -2,7 +2,7 @@ # https://github.com/libp2p/specs/blob/master/perf/perf.md PROTOCOL_NAME = "/perf/1.0.0" -WRITE_BLOCK_SIZE = 64 << 10 # 64KB (65536 bytes) +WRITE_BLOCK_SIZE = 65500 MAX_INBOUND_STREAMS = 1 MAX_OUTBOUND_STREAMS = 1 RUN_ON_LIMITED_CONNECTION = False From 2bff259a0e03b80e1ce22ff6977c1fc0f4b1b5c7 Mon Sep 17 00:00:00 2001 From: acul71 Date: Sun, 1 Feb 2026 14:15:36 +0100 Subject: [PATCH 05/27] wip: first interop perf experiment --- interop/perf/Dockerfile | 36 +++ interop/perf/perf_test.py | 626 ++++++++++++++++++++++++++++++++++++ interop/perf/pyproject.toml | 22 ++ 3 files changed, 684 insertions(+) create mode 100644 interop/perf/Dockerfile create mode 100644 interop/perf/perf_test.py create mode 100644 interop/perf/pyproject.toml diff --git a/interop/perf/Dockerfile b/interop/perf/Dockerfile new file mode 100644 index 000000000..73a4154b7 --- /dev/null +++ b/interop/perf/Dockerfile @@ -0,0 +1,36 @@ +FROM python:3.13-slim + +WORKDIR /app + +# Install system dependencies (contributing.rst + transport Dockerfile) +RUN apt-get update && apt-get install -y \ + redis-tools \ + build-essential \ + cmake \ + pkg-config \ + libgmp-dev \ + git \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# Install uv (docs/contributing.rst Option 2: same as CI) +RUN pip install --no-cache-dir uv +RUN uv --version + +# Build context is perf/images/python/v0.x (path in images.yaml) +COPY py-libp2p /app/py-libp2p + +RUN uv venv /app/venv + +# Interop/perf project: depends on libp2p from copied clone +COPY py-libp2p/interop/perf/pyproject.toml . +RUN uv pip install --python /app/venv/bin/python --upgrade pip && \ + uv pip install --python /app/venv/bin/python --no-cache-dir -e . + +COPY py-libp2p/interop/perf/perf_test.py . + +ENV PYTHONUNBUFFERED=1 +ENV CI=true +ENV PATH="/app/venv/bin:${PATH}" + +ENTRYPOINT ["/app/venv/bin/python", "perf_test.py"] diff --git a/interop/perf/perf_test.py b/interop/perf/perf_test.py new file mode 100644 index 000000000..6eaad399c --- /dev/null +++ b/interop/perf/perf_test.py @@ -0,0 +1,626 @@ +#!/usr/bin/env python3 +""" +Python libp2p perf test implementation for test-plans perf. + +Follows docs/write-a-perf-test-app.md: +- Reads configuration from environment variables +- Connects to Redis for coordination (SET/GET to match Rust) +- Implements both listener and dialer roles +- Measures upload/download throughput and latency +- Outputs results in YAML format to stdout (stderr for logs) +""" + +from datetime import datetime, timedelta, timezone +import ipaddress +import logging +import os +import ssl +import sys +import tempfile +import time +from typing import Any + +from cryptography import x509 +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.x509.oid import NameOID +import multiaddr +import redis +import trio + +try: + ExceptionGroup # noqa: B018 +except NameError: + from exceptiongroup import ExceptionGroup # type: ignore[no-redef] + +from libp2p import create_mplex_muxer_option, create_yamux_muxer_option, new_host +from libp2p.crypto.ed25519 import create_new_key_pair +from libp2p.crypto.x25519 import create_new_key_pair as create_new_x25519_key_pair +from libp2p.custom_types import TProtocol +from libp2p.peer.peerinfo import info_from_p2p_addr +from libp2p.security.insecure.transport import PLAINTEXT_PROTOCOL_ID, InsecureTransport +from libp2p.security.noise.transport import ( + PROTOCOL_ID as NOISE_PROTOCOL_ID, + Transport as NoiseTransport, +) +from libp2p.security.tls.transport import ( + PROTOCOL_ID as TLS_PROTOCOL_ID, + TLSTransport, +) +from libp2p.utils.address_validation import get_available_interfaces +from libp2p.perf import PerfService, PROTOCOL_NAME + +MAX_TEST_TIMEOUT = 300 +logger = logging.getLogger("libp2p.perf_test") + + +def configure_logging() -> None: + """Configure logging based on DEBUG environment variable.""" + debug_value = os.getenv("DEBUG") or "false" + debug_enabled = debug_value.upper() in ["DEBUG", "1", "TRUE", "YES"] + + for logger_name in ["multiaddr", "multiaddr.transforms", "multiaddr.codecs", "multiaddr.codecs.cid"]: + logging.getLogger(logger_name).setLevel(logging.WARNING) + + if debug_enabled: + for name in ["", "libp2p.perf_test", "libp2p", "libp2p.perf"]: + logging.getLogger(name).setLevel(logging.DEBUG) + print("Debug logging enabled", file=sys.stderr) + else: + logging.getLogger().setLevel(logging.INFO) + logging.getLogger("libp2p.perf_test").setLevel(logging.INFO) + for name in ["libp2p", "libp2p.transport"]: + logging.getLogger(name).setLevel(logging.WARNING) + + +def _percentile(sorted_values: list[float], p: float) -> float: + """Linear interpolation percentile.""" + n = len(sorted_values) + if n == 0: + return 0.0 + if n == 1: + return sorted_values[0] + index = (p / 100.0) * (n - 1) + lower = int(index) + upper = min(lower + 1, n - 1) + weight = index - lower + return sorted_values[lower] * (1.0 - weight) + sorted_values[upper] * weight + + +def _is_connection_closed_error(exc: BaseException) -> bool: + """True if this is the expected 'Connection closed' from swarm/mplex on shutdown.""" + msg = str(exc).lower() + if "connection closed" in msg: + return True + if isinstance(exc, ExceptionGroup): + return all(_is_connection_closed_error(e) for e in exc.exceptions) + return False + + +def _compute_stats(samples: list[float], is_latency: bool = False) -> dict[str, Any]: + """Compute min, q1, median, q3, max, outliers, samples (IQR-based).""" + if not samples: + return { + "min": 0.0, "q1": 0.0, "median": 0.0, "q3": 0.0, "max": 0.0, + "outliers": [], "samples": [], + } + sorted_vals = sorted(samples) + n = len(sorted_vals) + q1 = _percentile(sorted_vals, 25.0) + median = _percentile(sorted_vals, 50.0) + q3 = _percentile(sorted_vals, 75.0) + iqr = q3 - q1 + lower_fence = q1 - 1.5 * iqr + upper_fence = q3 + 1.5 * iqr + outliers = [v for v in sorted_vals if v < lower_fence or v > upper_fence] + non_outliers = [v for v in sorted_vals if lower_fence <= v <= upper_fence] + if non_outliers: + min_val, max_val = non_outliers[0], non_outliers[-1] + else: + min_val, max_val = sorted_vals[0], sorted_vals[-1] + fmt = "{:.3f}" if is_latency else "{:.2f}" + return { + "min": min_val, "q1": q1, "median": median, "q3": q3, "max": max_val, + "outliers": [float(fmt.format(x)) for x in outliers], + "samples": [float(fmt.format(x)) for x in sorted_vals], + } + + +class PerfTest: + def __init__(self) -> None: + self.transport = os.getenv("TRANSPORT") + if not self.transport: + raise ValueError("TRANSPORT environment variable is required") + standalone = ["quic-v1"] + self.muxer: str | None = None + self.security: str | None = None + if self.transport not in standalone: + self.muxer = os.getenv("MUXER") or "" + self.security = os.getenv("SECURE_CHANNEL") or "" + if not self.muxer or not self.security: + raise ValueError("MUXER and SECURE_CHANNEL required for non-standalone transport") + else: + self.muxer = os.getenv("MUXER") + self.security = os.getenv("SECURE_CHANNEL") + + is_dialer_val = os.getenv("IS_DIALER") + if is_dialer_val is None: + raise ValueError("IS_DIALER environment variable is required") + self.is_dialer = is_dialer_val == "true" + + self.ip = os.getenv("LISTENER_IP") or "0.0.0.0" + self.redis_addr = os.getenv("REDIS_ADDR") + if not self.redis_addr: + raise ValueError("REDIS_ADDR environment variable is required") + if ":" in self.redis_addr: + self.redis_host, port = self.redis_addr.split(":", 1) + self.redis_port = int(port) + else: + self.redis_host = self.redis_addr + self.redis_port = 6379 + + self.test_key = os.getenv("TEST_KEY") + if not self.test_key: + raise ValueError("TEST_KEY environment variable is required") + + self.upload_bytes = int(os.getenv("UPLOAD_BYTES") or "1073741824") + self.download_bytes = int(os.getenv("DOWNLOAD_BYTES") or "1073741824") + self.upload_iterations = int(os.getenv("UPLOAD_ITERATIONS") or "10") + self.download_iterations = int(os.getenv("DOWNLOAD_ITERATIONS") or "10") + self.latency_iterations = int(os.getenv("LATENCY_ITERATIONS") or "100") + + timeout_val = os.getenv("TEST_TIMEOUT_SECS") or "180" + self.test_timeout_seconds = min(int(timeout_val), MAX_TEST_TIMEOUT) + + self.host: Any = None + self.redis_client: redis.Redis[str] | None = None + self.perf_service: PerfService | None = None + + def validate_configuration(self) -> None: + valid_transports = ["tcp", "ws", "wss", "quic-v1"] + valid_security = ["noise", "plaintext", "tls"] + valid_muxers = ["mplex", "yamux"] + standalone = ["quic-v1"] + if self.transport not in valid_transports: + raise ValueError(f"Unsupported transport: {self.transport}. Supported: {valid_transports}") + if self.transport not in standalone: + if self.security not in valid_security: + raise ValueError(f"Unsupported security: {self.security}. Supported: {valid_security}") + if self.muxer not in valid_muxers: + raise ValueError(f"Unsupported muxer: {self.muxer}. Supported: {valid_muxers}") + + def create_security_options(self) -> tuple[dict[TProtocol, Any], Any]: + standalone = ["quic-v1"] + if self.transport in standalone: + return {}, create_new_key_pair() + key_pair = create_new_key_pair() + if self.security == "noise": + noise_kp = create_new_x25519_key_pair() + noise_transport = NoiseTransport( + libp2p_keypair=key_pair, + noise_privkey=noise_kp.private_key, + early_data=None, + ) + return {NOISE_PROTOCOL_ID: noise_transport}, key_pair + if self.security == "tls": + tls_transport = TLSTransport( + libp2p_keypair=key_pair, + early_data=None, + muxers=None, + ) + return {TLS_PROTOCOL_ID: tls_transport}, key_pair + if self.security == "plaintext": + pt = InsecureTransport( + local_key_pair=key_pair, + secure_bytes_provider=None, + peerstore=None, + ) + return {PLAINTEXT_PROTOCOL_ID: pt}, key_pair + raise ValueError(f"Unsupported security: {self.security}") + + def create_muxer_options(self) -> Any: + if self.transport in ["quic-v1"]: + return None + if self.muxer == "yamux": + return create_yamux_muxer_option() + if self.muxer == "mplex": + return create_mplex_muxer_option() + raise ValueError(f"Unsupported muxer: {self.muxer}") + + def create_tls_client_config(self) -> ssl.SSLContext | None: + if self.transport == "wss": + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + return ctx + return None + + def create_tls_server_config(self) -> ssl.SSLContext | None: + if self.transport == "wss": + try: + pk = rsa.generate_private_key(public_exponent=65537, key_size=2048) + subject = issuer = x509.Name([ + x509.NameAttribute(NameOID.COMMON_NAME, "libp2p.local"), + ]) + cert = ( + x509.CertificateBuilder() + .subject_name(subject) + .issuer_name(issuer) + .public_key(pk.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.now(timezone.utc)) + .not_valid_after(datetime.now(timezone.utc) + timedelta(days=365)) + .sign(pk, hashes.SHA256()) + ) + ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + with tempfile.NamedTemporaryFile(mode="wb", delete=False) as cf, \ + tempfile.NamedTemporaryFile(mode="wb", delete=False) as kf: + cf.write(cert.public_bytes(serialization.Encoding.PEM)) + kf.write(pk.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + )) + ctx.load_cert_chain(cf.name, kf.name) + try: + os.unlink(cf.name) + os.unlink(kf.name) + except Exception: + pass + return ctx + except Exception as e: + print(f"WARNING: TLS server config failed: {e}", file=sys.stderr) + return None + return None + + def _get_ip_value(self, addr: multiaddr.Multiaddr) -> str | None: + return addr.value_for_protocol("ip4") or addr.value_for_protocol("ip6") + + def _get_protocol_names(self, addr: multiaddr.Multiaddr) -> list[str]: + return [p.name for p in addr.protocols()] + + def _extract_and_preserve_p2p(self, addr: multiaddr.Multiaddr) -> tuple[multiaddr.Multiaddr, str | None]: + p2p_value = None + if "p2p" in self._get_protocol_names(addr): + p2p_value = addr.value_for_protocol("p2p") + if p2p_value: + addr = addr.decapsulate(multiaddr.Multiaddr(f"/p2p/{p2p_value}")) + return addr, p2p_value + + def _encapsulate_with_p2p(self, addr: multiaddr.Multiaddr, p2p_value: str | None) -> multiaddr.Multiaddr: + if p2p_value: + return addr.encapsulate(multiaddr.Multiaddr(f"/p2p/{p2p_value}")) + return addr + + def _build_quic_addr(self, ip_value: str, port: int) -> multiaddr.Multiaddr: + if ":" in ip_value: + base = multiaddr.Multiaddr(f"/ip6/{ip_value}/udp/{port}") + else: + base = multiaddr.Multiaddr(f"/ip4/{ip_value}/udp/{port}") + return base.encapsulate(multiaddr.Multiaddr("/quic-v1")) + + def create_listen_addresses(self, port: int = 0) -> list[multiaddr.Multiaddr]: + base_addrs = get_available_interfaces(port, protocol="tcp") + if self.transport == "quic-v1": + out = [] + for addr in base_addrs: + ip_value = self._get_ip_value(addr) + tcp_port = addr.value_for_protocol("tcp") or port + if ip_value: + qa = self._build_quic_addr(ip_value, tcp_port) + _, p2p = self._extract_and_preserve_p2p(addr) + qa = self._encapsulate_with_p2p(qa, p2p) + out.append(qa) + return out if out else [self._build_quic_addr("0.0.0.0", port)] + if self.transport == "ws": + out = [] + for addr in base_addrs: + try: + names = self._get_protocol_names(addr) + if "ws" in names or "wss" in names: + out.append(addr) + else: + a, p2p = self._extract_and_preserve_p2p(addr) + wa = a.encapsulate(multiaddr.Multiaddr("/ws")) + out.append(self._encapsulate_with_p2p(wa, p2p)) + except Exception: + pass + return out if out else [multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}/ws")] + if self.transport == "wss": + out = [] + for addr in base_addrs: + try: + names = self._get_protocol_names(addr) + a, p2p = self._extract_and_preserve_p2p(addr) + if "wss" in names: + out.append(addr) + elif "ws" in names: + a_ = a.decapsulate(multiaddr.Multiaddr("/ws")) + wss = a_.encapsulate(multiaddr.Multiaddr("/wss")) + out.append(self._encapsulate_with_p2p(wss, p2p)) + else: + wss = a.encapsulate(multiaddr.Multiaddr("/wss")) + out.append(self._encapsulate_with_p2p(wss, p2p)) + except Exception: + pass + return out if out else [multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}/wss")] + return base_addrs + + def _filter_addresses_by_transport(self, addresses: list[multiaddr.Multiaddr]) -> list[multiaddr.Multiaddr]: + out = [] + for addr in addresses: + names = self._get_protocol_names(addr) + if self.transport == "ws" and ("ws" in names or "wss" in names): + out.append(addr) + elif self.transport == "wss" and "wss" in names: + out.append(addr) + elif self.transport == "quic-v1" and "quic-v1" in names: + out.append(addr) + elif self.transport == "tcp" and not any(p in names for p in ["ws", "wss", "quic-v1"]): + out.append(addr) + return out if out else addresses + + def get_container_ip(self) -> str: + import socket + import subprocess + try: + r = subprocess.run(["hostname", "-I"], capture_output=True, text=True, timeout=5) + if r.returncode == 0 and r.stdout.strip(): + return r.stdout.strip().split()[0] + except Exception: + pass + try: + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: + s.connect(("8.8.8.8", 80)) + return s.getsockname()[0] + except Exception: + return "172.17.0.1" + + def _replace_loopback_ip(self, addr: multiaddr.Multiaddr) -> str: + ip_value = self._get_ip_value(addr) + if ip_value not in ["127.0.0.1", "0.0.0.0", "::1", "::"]: + return str(addr) + actual = self.get_container_ip() + names = self._get_protocol_names(addr) + is_ipv6 = "ip6" in names + parts = [f"/ip6/{actual}" if is_ipv6 else f"/ip4/{actual}"] + found = False + for p in addr.protocols(): + if p.name in ["ip4", "ip6"]: + found = True + continue + if found: + parts.append(f"/{p.name}/{p.value}" if p.value else f"/{p.name}") + return str(multiaddr.Multiaddr("".join(parts))) + + def _get_publishable_address(self, addresses: list[multiaddr.Multiaddr]) -> str: + filtered = self._filter_addresses_by_transport(addresses) + if not filtered: + filtered = addresses + for addr in filtered: + ip_value = self._get_ip_value(addr) + if ip_value and ip_value not in ["127.0.0.1", "0.0.0.0", "::1", "::"]: + return str(addr) + return self._replace_loopback_ip(filtered[0]) + + async def _connect_redis_with_retry(self, max_retries: int = 10, retry_delay: float = 1.0) -> None: + print("Connecting to Redis...", file=sys.stderr) + for attempt in range(max_retries): + try: + self.redis_client = redis.Redis( + host=self.redis_host, + port=self.redis_port, + decode_responses=True, + ) + self.redis_client.ping() + print(f"Connected to Redis on attempt {attempt + 1}", file=sys.stderr) + return + except Exception as e: + print(f"Redis attempt {attempt + 1} failed: {e}", file=sys.stderr) + if attempt < max_retries - 1: + await trio.sleep(retry_delay) + raise RuntimeError(f"Failed to connect to Redis after {max_retries} attempts") + + async def run_listener(self) -> None: + self.validate_configuration() + await self._connect_redis_with_retry() + + sec_opt, key_pair = self.create_security_options() + muxer_opt = self.create_muxer_options() + listen_addrs = self.create_listen_addresses(0) + tls_client = self.create_tls_client_config() + tls_server = self.create_tls_server_config() + + self.host = new_host( + key_pair=key_pair, + sec_opt=sec_opt, + muxer_opt=muxer_opt, + listen_addrs=listen_addrs, + enable_quic=(self.transport == "quic-v1"), + tls_client_config=tls_client, + tls_server_config=tls_server, + ) + self.perf_service = PerfService(self.host) + await self.perf_service.start() + print(f"Perf service started (protocol {PROTOCOL_NAME})", file=sys.stderr) + + async with self.host.run(listen_addrs=listen_addrs): + all_addrs = self.host.get_addrs() + if not all_addrs: + raise RuntimeError("No listen addresses available") + actual_addr = self._get_publishable_address(all_addrs) + print(f"Publishing address: {actual_addr}", file=sys.stderr) + redis_key = f"{self.test_key}_listener_multiaddr" + assert self.redis_client is not None + self.redis_client.set(redis_key, actual_addr) + print("Listener ready, waiting for dialer...", file=sys.stderr) + await trio.sleep_forever() + + async def _wait_for_listener_addr(self) -> str: + redis_key = f"{self.test_key}_listener_multiaddr" + timeout = min(self.test_timeout_seconds, MAX_TEST_TIMEOUT) + deadline = time.monotonic() + timeout + assert self.redis_client is not None + while time.monotonic() < deadline: + addr = self.redis_client.get(redis_key) + if addr: + return addr + await trio.sleep(0.5) + raise RuntimeError(f"Timeout waiting for listener address (key {redis_key}) after {timeout}s") + + async def _one_measurement( + self, + send_bytes: int, + recv_bytes: int, + ) -> float: + """Run one measure_performance call and return elapsed time in seconds.""" + assert self.host is not None + assert self.perf_service is not None + maddr = multiaddr.Multiaddr(self.listener_addr) + start = time.monotonic() + async for _ in self.perf_service.measure_performance(maddr, send_bytes, recv_bytes): + pass + return time.monotonic() - start + + async def run_dialer(self) -> None: + self.validate_configuration() + await self._connect_redis_with_retry() + print("Waiting for listener address...", file=sys.stderr) + self.listener_addr = await self._wait_for_listener_addr() + print(f"Got listener address: {self.listener_addr}", file=sys.stderr) + + sec_opt, key_pair = self.create_security_options() + muxer_opt = self.create_muxer_options() + # Dialer needs listen_addrs for ws/wss so transport is registered; for quic/tcp pass [] (host.run still starts swarm/nursery) + dialer_listen_addrs = self.create_listen_addresses(0) if self.transport in ["ws", "wss"] else None + tls_client = self.create_tls_client_config() + tls_server = None + + kw: dict[str, Any] = { + "key_pair": key_pair, + "sec_opt": sec_opt, + "muxer_opt": muxer_opt, + "enable_quic": (self.transport == "quic-v1"), + "tls_client_config": tls_client, + "tls_server_config": tls_server, + } + if dialer_listen_addrs: + kw["listen_addrs"] = dialer_listen_addrs + self.host = new_host(**kw) + self.perf_service = PerfService(self.host) + await self.perf_service.start() + + # Must run host inside host.run() so swarm/nursery are active (required for connect and QUIC) + try: + async with self.host.run(listen_addrs=dialer_listen_addrs or []): + # Brief delay so listener is fully listening before we dial + await trio.sleep(1.0) + + maddr = multiaddr.Multiaddr(self.listener_addr) + info = info_from_p2p_addr(maddr) + listener_peer_id = info.peer_id + await self.host.connect(info) + print("Connected to listener", file=sys.stderr) + + upload_samples: list[float] = [] + for i in range(self.upload_iterations): + elapsed = await self._one_measurement(self.upload_bytes, 0) + gbps = (self.upload_bytes * 8.0) / elapsed / 1e9 if elapsed > 0 else 0.0 + upload_samples.append(gbps) + print(f"Upload iteration {i+1}/{self.upload_iterations}: {gbps:.2f} Gbps", file=sys.stderr) + + download_samples: list[float] = [] + for i in range(self.download_iterations): + elapsed = await self._one_measurement(0, self.download_bytes) + gbps = (self.download_bytes * 8.0) / elapsed / 1e9 if elapsed > 0 else 0.0 + download_samples.append(gbps) + print(f"Download iteration {i+1}/{self.download_iterations}: {gbps:.2f} Gbps", file=sys.stderr) + + latency_samples: list[float] = [] + for i in range(self.latency_iterations): + elapsed = await self._one_measurement(1, 1) + latency_samples.append(elapsed * 1000.0) + print("Latency iterations done", file=sys.stderr) + + u = _compute_stats(upload_samples, is_latency=False) + d = _compute_stats(download_samples, is_latency=False) + l = _compute_stats(latency_samples, is_latency=True) + + # YAML to stdout only (per write-a-perf-test-app.md) + print("upload:") + print(f" iterations: {self.upload_iterations}") + print(f" min: {u['min']:.2f}") + print(f" q1: {u['q1']:.2f}") + print(f" median: {u['median']:.2f}") + print(f" q3: {u['q3']:.2f}") + print(f" max: {u['max']:.2f}") + print(f" outliers: {u['outliers']}") + print(f" samples: {u['samples']}") + print(" unit: Gbps") + print("download:") + print(f" iterations: {self.download_iterations}") + print(f" min: {d['min']:.2f}") + print(f" q1: {d['q1']:.2f}") + print(f" median: {d['median']:.2f}") + print(f" q3: {d['q3']:.2f}") + print(f" max: {d['max']:.2f}") + print(f" outliers: {d['outliers']}") + print(f" samples: {d['samples']}") + print(" unit: Gbps") + print("latency:") + print(f" iterations: {self.latency_iterations}") + print(f" min: {l['min']:.3f}") + print(f" q1: {l['q1']:.3f}") + print(f" median: {l['median']:.3f}") + print(f" q3: {l['q3']:.3f}") + print(f" max: {l['max']:.3f}") + print(f" outliers: {l['outliers']}") + print(f" samples: {l['samples']}") + print(" unit: ms") + + # Graceful close: disconnect listener so it sees a clean close, then stop services + try: + await self.host.disconnect(listener_peer_id) + await trio.sleep(0.5) + except Exception as e: + logger.debug("Disconnect: %s", e) + try: + await self.perf_service.stop() + except Exception as e: + logger.debug("PerfService.stop: %s", e) + if self.redis_client: + try: + self.redis_client.close() + except Exception: + pass + except BaseException as e: + # Swarm/mplex may raise "Connection closed" when we disconnect; treat as success + if not _is_connection_closed_error(e): + raise + + async def run(self) -> None: + try: + await self._connect_redis_with_retry() + if self.is_dialer: + await self.run_dialer() + else: + await self.run_listener() + except Exception as e: + print(f"Error: {e}", file=sys.stderr) + import traceback + traceback.print_exc(file=sys.stderr) + if self.redis_client: + self.redis_client.close() + sys.exit(1) + + +async def main() -> None: + configure_logging() + test = PerfTest() + await test.run() + + +if __name__ == "__main__": + trio.run(main) diff --git a/interop/perf/pyproject.toml b/interop/perf/pyproject.toml new file mode 100644 index 000000000..899922697 --- /dev/null +++ b/interop/perf/pyproject.toml @@ -0,0 +1,22 @@ +[build-system] +requires = ["setuptools>=61.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "py-libp2p-perf-test" +version = "0.1.0" +description = "Python libp2p perf test implementation for test-plans perf" +authors = [ + {name = "libp2p", email = "team@libp2p.io"} +] +requires-python = ">=3.11" +dependencies = [ + "libp2p @ file:///app/py-libp2p", + "redis>=4.0.0", + "typing-extensions>=4.0.0", + "cryptography>=41.0.0", +] + +[tool.setuptools.packages.find] +where = ["."] +include = ["perf_test*"] From 5eded13c516380ba42aa5432a437ff7b228454cc Mon Sep 17 00:00:00 2001 From: acul71 Date: Sun, 1 Feb 2026 14:46:21 +0100 Subject: [PATCH 06/27] fix: Dockerfile as transport --- interop/perf/Dockerfile | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/interop/perf/Dockerfile b/interop/perf/Dockerfile index 73a4154b7..1329537b4 100644 --- a/interop/perf/Dockerfile +++ b/interop/perf/Dockerfile @@ -17,17 +17,17 @@ RUN apt-get update && apt-get install -y \ RUN pip install --no-cache-dir uv RUN uv --version -# Build context is perf/images/python/v0.x (path in images.yaml) -COPY py-libp2p /app/py-libp2p +# Build context = repo root (same as interop/transport/Dockerfile for GitHub + local) +COPY . /app/py-libp2p RUN uv venv /app/venv # Interop/perf project: depends on libp2p from copied clone -COPY py-libp2p/interop/perf/pyproject.toml . +COPY interop/perf/pyproject.toml . RUN uv pip install --python /app/venv/bin/python --upgrade pip && \ uv pip install --python /app/venv/bin/python --no-cache-dir -e . -COPY py-libp2p/interop/perf/perf_test.py . +COPY interop/perf/perf_test.py . ENV PYTHONUNBUFFERED=1 ENV CI=true From 2c48b4c6cb7e0bd511018b03436491911ff29795 Mon Sep 17 00:00:00 2001 From: Mohan Date: Tue, 3 Feb 2026 02:02:41 +0530 Subject: [PATCH 07/27] fixed the lint issue --- interop/perf/perf_test.py | 127 ++++++++++++++++++++++++++++---------- 1 file changed, 94 insertions(+), 33 deletions(-) diff --git a/interop/perf/perf_test.py b/interop/perf/perf_test.py index 6eaad399c..da8451fba 100644 --- a/interop/perf/perf_test.py +++ b/interop/perf/perf_test.py @@ -11,7 +11,6 @@ """ from datetime import datetime, timedelta, timezone -import ipaddress import logging import os import ssl @@ -38,6 +37,7 @@ from libp2p.crypto.x25519 import create_new_key_pair as create_new_x25519_key_pair from libp2p.custom_types import TProtocol from libp2p.peer.peerinfo import info_from_p2p_addr +from libp2p.perf import PROTOCOL_NAME, PerfService from libp2p.security.insecure.transport import PLAINTEXT_PROTOCOL_ID, InsecureTransport from libp2p.security.noise.transport import ( PROTOCOL_ID as NOISE_PROTOCOL_ID, @@ -48,7 +48,6 @@ TLSTransport, ) from libp2p.utils.address_validation import get_available_interfaces -from libp2p.perf import PerfService, PROTOCOL_NAME MAX_TEST_TIMEOUT = 300 logger = logging.getLogger("libp2p.perf_test") @@ -59,7 +58,12 @@ def configure_logging() -> None: debug_value = os.getenv("DEBUG") or "false" debug_enabled = debug_value.upper() in ["DEBUG", "1", "TRUE", "YES"] - for logger_name in ["multiaddr", "multiaddr.transforms", "multiaddr.codecs", "multiaddr.codecs.cid"]: + for logger_name in [ + "multiaddr", + "multiaddr.transforms", + "multiaddr.codecs", + "multiaddr.codecs.cid", + ]: logging.getLogger(logger_name).setLevel(logging.WARNING) if debug_enabled: @@ -101,8 +105,13 @@ def _compute_stats(samples: list[float], is_latency: bool = False) -> dict[str, """Compute min, q1, median, q3, max, outliers, samples (IQR-based).""" if not samples: return { - "min": 0.0, "q1": 0.0, "median": 0.0, "q3": 0.0, "max": 0.0, - "outliers": [], "samples": [], + "min": 0.0, + "q1": 0.0, + "median": 0.0, + "q3": 0.0, + "max": 0.0, + "outliers": [], + "samples": [], } sorted_vals = sorted(samples) n = len(sorted_vals) @@ -120,7 +129,11 @@ def _compute_stats(samples: list[float], is_latency: bool = False) -> dict[str, min_val, max_val = sorted_vals[0], sorted_vals[-1] fmt = "{:.3f}" if is_latency else "{:.2f}" return { - "min": min_val, "q1": q1, "median": median, "q3": q3, "max": max_val, + "min": min_val, + "q1": q1, + "median": median, + "q3": q3, + "max": max_val, "outliers": [float(fmt.format(x)) for x in outliers], "samples": [float(fmt.format(x)) for x in sorted_vals], } @@ -138,7 +151,9 @@ def __init__(self) -> None: self.muxer = os.getenv("MUXER") or "" self.security = os.getenv("SECURE_CHANNEL") or "" if not self.muxer or not self.security: - raise ValueError("MUXER and SECURE_CHANNEL required for non-standalone transport") + raise ValueError( + "MUXER and SECURE_CHANNEL required for non-standalone transport" + ) else: self.muxer = os.getenv("MUXER") self.security = os.getenv("SECURE_CHANNEL") @@ -182,12 +197,18 @@ def validate_configuration(self) -> None: valid_muxers = ["mplex", "yamux"] standalone = ["quic-v1"] if self.transport not in valid_transports: - raise ValueError(f"Unsupported transport: {self.transport}. Supported: {valid_transports}") + raise ValueError( + f"Unsupported transport: {self.transport}. Supported: {valid_transports}" + ) if self.transport not in standalone: if self.security not in valid_security: - raise ValueError(f"Unsupported security: {self.security}. Supported: {valid_security}") + raise ValueError( + f"Unsupported security: {self.security}. Supported: {valid_security}" + ) if self.muxer not in valid_muxers: - raise ValueError(f"Unsupported muxer: {self.muxer}. Supported: {valid_muxers}") + raise ValueError( + f"Unsupported muxer: {self.muxer}. Supported: {valid_muxers}" + ) def create_security_options(self) -> tuple[dict[TProtocol, Any], Any]: standalone = ["quic-v1"] @@ -239,9 +260,11 @@ def create_tls_server_config(self) -> ssl.SSLContext | None: if self.transport == "wss": try: pk = rsa.generate_private_key(public_exponent=65537, key_size=2048) - subject = issuer = x509.Name([ - x509.NameAttribute(NameOID.COMMON_NAME, "libp2p.local"), - ]) + subject = issuer = x509.Name( + [ + x509.NameAttribute(NameOID.COMMON_NAME, "libp2p.local"), + ] + ) cert = ( x509.CertificateBuilder() .subject_name(subject) @@ -255,14 +278,18 @@ def create_tls_server_config(self) -> ssl.SSLContext | None: ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE - with tempfile.NamedTemporaryFile(mode="wb", delete=False) as cf, \ - tempfile.NamedTemporaryFile(mode="wb", delete=False) as kf: + with ( + tempfile.NamedTemporaryFile(mode="wb", delete=False) as cf, + tempfile.NamedTemporaryFile(mode="wb", delete=False) as kf, + ): cf.write(cert.public_bytes(serialization.Encoding.PEM)) - kf.write(pk.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=serialization.NoEncryption(), - )) + kf.write( + pk.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ) + ) ctx.load_cert_chain(cf.name, kf.name) try: os.unlink(cf.name) @@ -281,7 +308,9 @@ def _get_ip_value(self, addr: multiaddr.Multiaddr) -> str | None: def _get_protocol_names(self, addr: multiaddr.Multiaddr) -> list[str]: return [p.name for p in addr.protocols()] - def _extract_and_preserve_p2p(self, addr: multiaddr.Multiaddr) -> tuple[multiaddr.Multiaddr, str | None]: + def _extract_and_preserve_p2p( + self, addr: multiaddr.Multiaddr + ) -> tuple[multiaddr.Multiaddr, str | None]: p2p_value = None if "p2p" in self._get_protocol_names(addr): p2p_value = addr.value_for_protocol("p2p") @@ -289,7 +318,9 @@ def _extract_and_preserve_p2p(self, addr: multiaddr.Multiaddr) -> tuple[multiadd addr = addr.decapsulate(multiaddr.Multiaddr(f"/p2p/{p2p_value}")) return addr, p2p_value - def _encapsulate_with_p2p(self, addr: multiaddr.Multiaddr, p2p_value: str | None) -> multiaddr.Multiaddr: + def _encapsulate_with_p2p( + self, addr: multiaddr.Multiaddr, p2p_value: str | None + ) -> multiaddr.Multiaddr: if p2p_value: return addr.encapsulate(multiaddr.Multiaddr(f"/p2p/{p2p_value}")) return addr @@ -348,7 +379,9 @@ def create_listen_addresses(self, port: int = 0) -> list[multiaddr.Multiaddr]: return out if out else [multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}/wss")] return base_addrs - def _filter_addresses_by_transport(self, addresses: list[multiaddr.Multiaddr]) -> list[multiaddr.Multiaddr]: + def _filter_addresses_by_transport( + self, addresses: list[multiaddr.Multiaddr] + ) -> list[multiaddr.Multiaddr]: out = [] for addr in addresses: names = self._get_protocol_names(addr) @@ -358,15 +391,20 @@ def _filter_addresses_by_transport(self, addresses: list[multiaddr.Multiaddr]) - out.append(addr) elif self.transport == "quic-v1" and "quic-v1" in names: out.append(addr) - elif self.transport == "tcp" and not any(p in names for p in ["ws", "wss", "quic-v1"]): + elif self.transport == "tcp" and not any( + p in names for p in ["ws", "wss", "quic-v1"] + ): out.append(addr) return out if out else addresses def get_container_ip(self) -> str: import socket import subprocess + try: - r = subprocess.run(["hostname", "-I"], capture_output=True, text=True, timeout=5) + r = subprocess.run( + ["hostname", "-I"], capture_output=True, text=True, timeout=5 + ) if r.returncode == 0 and r.stdout.strip(): return r.stdout.strip().split()[0] except Exception: @@ -405,7 +443,9 @@ def _get_publishable_address(self, addresses: list[multiaddr.Multiaddr]) -> str: return str(addr) return self._replace_loopback_ip(filtered[0]) - async def _connect_redis_with_retry(self, max_retries: int = 10, retry_delay: float = 1.0) -> None: + async def _connect_redis_with_retry( + self, max_retries: int = 10, retry_delay: float = 1.0 + ) -> None: print("Connecting to Redis...", file=sys.stderr) for attempt in range(max_retries): try: @@ -468,7 +508,9 @@ async def _wait_for_listener_addr(self) -> str: if addr: return addr await trio.sleep(0.5) - raise RuntimeError(f"Timeout waiting for listener address (key {redis_key}) after {timeout}s") + raise RuntimeError( + f"Timeout waiting for listener address (key {redis_key}) after {timeout}s" + ) async def _one_measurement( self, @@ -480,7 +522,9 @@ async def _one_measurement( assert self.perf_service is not None maddr = multiaddr.Multiaddr(self.listener_addr) start = time.monotonic() - async for _ in self.perf_service.measure_performance(maddr, send_bytes, recv_bytes): + async for _ in self.perf_service.measure_performance( + maddr, send_bytes, recv_bytes + ): pass return time.monotonic() - start @@ -494,7 +538,9 @@ async def run_dialer(self) -> None: sec_opt, key_pair = self.create_security_options() muxer_opt = self.create_muxer_options() # Dialer needs listen_addrs for ws/wss so transport is registered; for quic/tcp pass [] (host.run still starts swarm/nursery) - dialer_listen_addrs = self.create_listen_addresses(0) if self.transport in ["ws", "wss"] else None + dialer_listen_addrs = ( + self.create_listen_addresses(0) if self.transport in ["ws", "wss"] else None + ) tls_client = self.create_tls_client_config() tls_server = None @@ -527,16 +573,30 @@ async def run_dialer(self) -> None: upload_samples: list[float] = [] for i in range(self.upload_iterations): elapsed = await self._one_measurement(self.upload_bytes, 0) - gbps = (self.upload_bytes * 8.0) / elapsed / 1e9 if elapsed > 0 else 0.0 + gbps = ( + (self.upload_bytes * 8.0) / elapsed / 1e9 + if elapsed > 0 + else 0.0 + ) upload_samples.append(gbps) - print(f"Upload iteration {i+1}/{self.upload_iterations}: {gbps:.2f} Gbps", file=sys.stderr) + print( + f"Upload iteration {i + 1}/{self.upload_iterations}: {gbps:.2f} Gbps", + file=sys.stderr, + ) download_samples: list[float] = [] for i in range(self.download_iterations): elapsed = await self._one_measurement(0, self.download_bytes) - gbps = (self.download_bytes * 8.0) / elapsed / 1e9 if elapsed > 0 else 0.0 + gbps = ( + (self.download_bytes * 8.0) / elapsed / 1e9 + if elapsed > 0 + else 0.0 + ) download_samples.append(gbps) - print(f"Download iteration {i+1}/{self.download_iterations}: {gbps:.2f} Gbps", file=sys.stderr) + print( + f"Download iteration {i + 1}/{self.download_iterations}: {gbps:.2f} Gbps", + file=sys.stderr, + ) latency_samples: list[float] = [] for i in range(self.latency_iterations): @@ -610,6 +670,7 @@ async def run(self) -> None: except Exception as e: print(f"Error: {e}", file=sys.stderr) import traceback + traceback.print_exc(file=sys.stderr) if self.redis_client: self.redis_client.close() From 195f7114e4268660419acfbf96ccdb482ed3daa0 Mon Sep 17 00:00:00 2001 From: Mohan Date: Tue, 3 Feb 2026 02:29:50 +0530 Subject: [PATCH 08/27] moved the interfaces to abc.py file and restructure the code --- libp2p/abc.py | 54 +++++++++++++++++++ libp2p/perf/__init__.py | 4 +- libp2p/perf/index.py | 103 ------------------------------------ libp2p/perf/perf_service.py | 4 +- libp2p/perf/types.py | 45 ++++++++++++++++ 5 files changed, 103 insertions(+), 107 deletions(-) delete mode 100644 libp2p/perf/index.py create mode 100644 libp2p/perf/types.py diff --git a/libp2p/abc.py b/libp2p/abc.py index 06eb106c4..e7bda3cdf 100644 --- a/libp2p/abc.py +++ b/libp2p/abc.py @@ -4,6 +4,7 @@ ) from collections.abc import ( AsyncIterable, + AsyncIterator, Iterable, KeysView, Sequence, @@ -3098,3 +3099,56 @@ async def publish(self, topic_id: str | list[str], data: bytes) -> None: """ ... + + +# -------------------------- perf interface.py -------------------------- + + +class IPerf(ABC): + """ + Interface for the perf protocol service. + + Spec: https://github.com/libp2p/specs/blob/master/perf/perf.md + """ + + @abstractmethod + async def start(self) -> None: + """Start the perf service and register the protocol handler.""" + ... + + @abstractmethod + async def stop(self) -> None: + """Stop the perf service and unregister the protocol handler.""" + ... + + @abstractmethod + def is_started(self) -> bool: + """Check if the service is currently running.""" + ... + + @abstractmethod + def measure_performance( + self, + multiaddr: Multiaddr, + send_bytes: int, + recv_bytes: int, + ) -> AsyncIterator[Any]: + """ + Measure transfer performance to a remote peer. + + Parameters + ---------- + multiaddr : Multiaddr + The address of the remote peer to test against. + send_bytes : int + Number of bytes to upload to the remote peer. + recv_bytes : int + Number of bytes to request the remote peer to send back. + + Yields + ------ + PerfOutput + Progress reports during the transfer, with a final summary at the end. + + """ + ... diff --git a/libp2p/perf/__init__.py b/libp2p/perf/__init__.py index 2eba91a41..67df485a0 100644 --- a/libp2p/perf/__init__.py +++ b/libp2p/perf/__init__.py @@ -12,13 +12,13 @@ RUN_ON_LIMITED_CONNECTION, WRITE_BLOCK_SIZE, ) -from .index import ( - IPerf, +from .types import ( PerfComponents, PerfInit, PerfOptions, PerfOutput, ) +from libp2p.abc import IPerf from .perf_service import PerfService __all__ = [ diff --git a/libp2p/perf/index.py b/libp2p/perf/index.py deleted file mode 100644 index 8359e98c1..000000000 --- a/libp2p/perf/index.py +++ /dev/null @@ -1,103 +0,0 @@ -""" -Perf protocol interfaces and types. -Spec: https://github.com/libp2p/specs/blob/master/perf/perf.md -""" - -from abc import ABC, abstractmethod -from collections.abc import AsyncIterator -from typing import ( - TYPE_CHECKING, - Literal, - TypedDict, -) - -if TYPE_CHECKING: - from multiaddr import Multiaddr - - from libp2p.abc import IHost - - -# ------------------------------- Types ------------------------------- - - -class PerfOutput(TypedDict): - """Output data from a performance measurement.""" - - type: Literal["connection", "stream", "intermediary", "final"] - time_seconds: float - upload_bytes: int - download_bytes: int - - -class PerfInit(TypedDict, total=False): - """Initialization options for the perf service.""" - - protocol_name: str - max_inbound_streams: int - max_outbound_streams: int - run_on_limited_connection: bool - write_block_size: int # Default: 65536 (64KB) - - -class PerfOptions(TypedDict, total=False): - """Options for a performance measurement run.""" - - reuse_existing_connection: bool # Default: False - - -class PerfComponents(TypedDict): - """Components required by the perf service.""" - - host: "IHost" - - -# ------------------------------- Interface ------------------------------- - - -class IPerf(ABC): - """Interface for the perf protocol service.""" - - @abstractmethod - async def start(self) -> None: - """Start the perf service and register the protocol handler.""" - ... - - @abstractmethod - async def stop(self) -> None: - """Stop the perf service and unregister the protocol handler.""" - ... - - @abstractmethod - def is_started(self) -> bool: - """Check if the service is currently running.""" - ... - - @abstractmethod - def measure_performance( - self, - multiaddr: "Multiaddr", - send_bytes: int, - recv_bytes: int, - options: PerfOptions | None = None, - ) -> AsyncIterator[PerfOutput]: - """ - Measure transfer performance to a remote peer. - - Parameters - ---------- - multiaddr : Multiaddr - The address of the remote peer to test against. - send_bytes : int - Number of bytes to upload to the remote peer. - recv_bytes : int - Number of bytes to request the remote peer to send back. - options : PerfOptions, optional - Options for the performance run. - - Yields - ------ - PerfOutput - Progress reports during the transfer, with a final summary at the end. - - """ - ... diff --git a/libp2p/perf/perf_service.py b/libp2p/perf/perf_service.py index d77567896..a680cbaa6 100644 --- a/libp2p/perf/perf_service.py +++ b/libp2p/perf/perf_service.py @@ -10,7 +10,7 @@ from multiaddr import Multiaddr -from libp2p.abc import IHost, INetStream +from libp2p.abc import IHost, INetStream, IPerf from libp2p.custom_types import TProtocol from libp2p.peer.id import ID as PeerID from libp2p.peer.peerinfo import PeerInfo @@ -22,7 +22,7 @@ RUN_ON_LIMITED_CONNECTION, WRITE_BLOCK_SIZE, ) -from .index import IPerf, PerfInit, PerfOptions, PerfOutput +from .types import PerfInit, PerfOptions, PerfOutput logger = logging.getLogger(__name__) diff --git a/libp2p/perf/types.py b/libp2p/perf/types.py new file mode 100644 index 000000000..4e25803ad --- /dev/null +++ b/libp2p/perf/types.py @@ -0,0 +1,45 @@ +""" +Perf protocol types. + +Spec: https://github.com/libp2p/specs/blob/master/perf/perf.md +""" + +from typing import ( + TYPE_CHECKING, + Literal, + TypedDict, +) + +if TYPE_CHECKING: + from libp2p.abc import IHost + + +class PerfOutput(TypedDict): + """Output data from a performance measurement.""" + + type: Literal["connection", "stream", "intermediary", "final"] + time_seconds: float + upload_bytes: int + download_bytes: int + + +class PerfInit(TypedDict, total=False): + """Initialization options for the perf service.""" + + protocol_name: str + max_inbound_streams: int + max_outbound_streams: int + run_on_limited_connection: bool + write_block_size: int # Default: 65536 (64KB) + + +class PerfOptions(TypedDict, total=False): + """Options for a performance measurement run.""" + + reuse_existing_connection: bool # Default: False + + +class PerfComponents(TypedDict): + """Components required by the perf service.""" + + host: "IHost" From efdb1cda4ef3776461465c32607a7273d24e70ac Mon Sep 17 00:00:00 2001 From: Mohan Date: Tue, 3 Feb 2026 02:38:45 +0530 Subject: [PATCH 09/27] pushed the .rst files --- docs/examples.perf.rst | 69 ++++++++++++++++++++++++++++++++++++++++++ docs/examples.rst | 1 + docs/libp2p.perf.rst | 41 +++++++++++++++++++++++++ docs/libp2p.rst | 1 + 4 files changed, 112 insertions(+) create mode 100644 docs/examples.perf.rst create mode 100644 docs/libp2p.perf.rst diff --git a/docs/examples.perf.rst b/docs/examples.perf.rst new file mode 100644 index 000000000..00b0532f2 --- /dev/null +++ b/docs/examples.perf.rst @@ -0,0 +1,69 @@ +Perf Protocol Demo +================== + +This example demonstrates how to use the libp2p ``perf`` protocol to measure +transfer performance between two nodes. + +The perf protocol sends and receives data to measure throughput, reporting +both intermediary progress and final results. + +Running the Example +------------------- + +First, start the server in one terminal: + +.. code-block:: console + + $ python examples/perf/perf_example.py -p 8000 + + Perf server ready, listening on: + /ip4/127.0.0.1/tcp/8000/p2p/QmXfptdHU6hqG95JswxYVUH4bphcK8y18mhFcgUQFe6fCN + + Protocol: /perf/1.0.0 + + Run client with: + python perf_example.py -d /ip4/127.0.0.1/tcp/8000/p2p/QmXfptdHU6hqG95JswxYVUH4bphcK8y18mhFcgUQFe6fCN + + Waiting for incoming perf requests... + +Then, in another terminal, run the client with the multiaddr from the server: + +.. code-block:: console + + $ python examples/perf/perf_example.py -d /ip4/127.0.0.1/tcp/8000/p2p/QmXfptdHU6hqG95JswxYVUH4bphcK8y18mhFcgUQFe6fCN + + Connecting to QmXfptdHU6hqG95JswxYVUH4bphcK8y18mhFcgUQFe6fCN... + Connected! + + Measuring performance: + Upload: 2560 bytes + Download: 2560 bytes + + Uploading: 2560 bytes in 0.01s (256000 bytes/s) + Downloading: 2560 bytes in 0.01s (256000 bytes/s) + + ================================================== + Performance Results: + Total time: 0.025 seconds + Uploaded: 2560 bytes + Downloaded: 2560 bytes + Total data: 5120 bytes + Throughput: 204800 bytes/s + ================================================== + +Command Line Options +-------------------- + +.. code-block:: text + + -p, --port Listening port (default: random free port) + -d, --destination Destination multiaddr (if not set, runs as server) + -u, --upload Upload size in units of 256 bytes (default: 10) + -D, --download Download size in units of 256 bytes (default: 10) + +Source Code +----------- + +.. literalinclude:: ../examples/perf/perf_example.py + :language: python + :linenos: diff --git a/docs/examples.rst b/docs/examples.rst index 56c27a033..7b3aa7dbb 100644 --- a/docs/examples.rst +++ b/docs/examples.rst @@ -23,3 +23,4 @@ Examples examples.multiple_connections examples.websocket examples.tls + examples.perf diff --git a/docs/libp2p.perf.rst b/docs/libp2p.perf.rst new file mode 100644 index 000000000..2b9d36386 --- /dev/null +++ b/docs/libp2p.perf.rst @@ -0,0 +1,41 @@ +libp2p.perf package +=================== + +The perf module implements the libp2p performance measurement protocol, +which allows measuring throughput between two libp2p nodes by sending +and receiving configurable amounts of data. + +Submodules +---------- + +libp2p.perf.constants module +---------------------------- + +.. automodule:: libp2p.perf.constants + :members: + :undoc-members: + :show-inheritance: + +libp2p.perf.types module +------------------------ + +.. automodule:: libp2p.perf.types + :members: + :undoc-members: + :show-inheritance: + +libp2p.perf.perf\_service module +-------------------------------- + +.. automodule:: libp2p.perf.perf_service + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: libp2p.perf + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/libp2p.rst b/docs/libp2p.rst index 62daa743d..c4a61f6a5 100644 --- a/docs/libp2p.rst +++ b/docs/libp2p.rst @@ -16,6 +16,7 @@ Subpackages libp2p.kad_dht libp2p.network libp2p.peer + libp2p.perf libp2p.protocol_muxer libp2p.pubsub libp2p.rcmgr From 25e7a767e127420849f8d54c640441e9c037b5a4 Mon Sep 17 00:00:00 2001 From: Mohan Date: Tue, 3 Feb 2026 02:44:46 +0530 Subject: [PATCH 10/27] included the perf in examples.rst --- docs/examples.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/examples.rst b/docs/examples.rst index af7b8fbff..b4b71c35a 100644 --- a/docs/examples.rst +++ b/docs/examples.rst @@ -24,3 +24,4 @@ Examples examples.websocket examples.tls examples.autotls + examples.perf From 10d359d950491f0a216d6ac76c5a2045ae7bcb10 Mon Sep 17 00:00:00 2001 From: Mohan Date: Tue, 3 Feb 2026 13:55:31 +0530 Subject: [PATCH 11/27] pushed the newsfragments --- newsfragments/1176.feature.rst | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 newsfragments/1176.feature.rst diff --git a/newsfragments/1176.feature.rst b/newsfragments/1176.feature.rst new file mode 100644 index 000000000..2b1ac8aea --- /dev/null +++ b/newsfragments/1176.feature.rst @@ -0,0 +1,9 @@ +Added implementation of the libp2p perf protocol for measuring transfer performance. + +The perf protocol allows benchmarking data transfer speeds between libp2p nodes. It includes: +- ``PerfService`` class with configurable inbound/outbound stream limits +- ``measure_performance()`` method to measure upload/download throughput to a remote peer +- Server-side handling for responding to perf protocol requests +- Detailed metrics output including latency, upload/download times, and throughput in bytes/second + +See the spec at https://github.com/libp2p/specs/blob/master/perf/perf.md From 7fdb85260da7984ecb2a5e64aa551abf1070c5c9 Mon Sep 17 00:00:00 2001 From: Mohan Date: Tue, 3 Feb 2026 13:57:23 +0530 Subject: [PATCH 12/27] fixed thelength issue --- examples/perf/perf_example.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/examples/perf/perf_example.py b/examples/perf/perf_example.py index 3387e421e..25b6ad4fc 100644 --- a/examples/perf/perf_example.py +++ b/examples/perf/perf_example.py @@ -69,12 +69,14 @@ async def run_client( if upload_bytes_out > 0: throughput = upload_bytes_out / time_s if time_s > 0 else 0 print( - f" Uploading: {upload_bytes_out} bytes in {time_s:.2f}s ({throughput:.0f} bytes/s)" + f" Upload: {upload_bytes_out}B in {time_s:.2f}s " + f"({throughput:.0f} B/s)" ) elif download_bytes_out > 0: throughput = download_bytes_out / time_s if time_s > 0 else 0 print( - f" Downloading: {download_bytes_out} bytes in {time_s:.2f}s ({throughput:.0f} bytes/s)" + f" Download: {download_bytes_out}B in {time_s:.2f}s " + f"({throughput:.0f} B/s)" ) elif output["type"] == "final": From 80e2b9614b6e2df97e09073e795700ecf7989f15 Mon Sep 17 00:00:00 2001 From: Mohan Date: Tue, 3 Feb 2026 14:06:51 +0530 Subject: [PATCH 13/27] fixed some issues in perf_test.py --- interop/perf/perf_test.py | 39 +++++++++++++++++++++---------------- libp2p/perf/perf_service.py | 3 ++- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/interop/perf/perf_test.py b/interop/perf/perf_test.py index da8451fba..7216b911f 100644 --- a/interop/perf/perf_test.py +++ b/interop/perf/perf_test.py @@ -114,7 +114,6 @@ def _compute_stats(samples: list[float], is_latency: bool = False) -> dict[str, "samples": [], } sorted_vals = sorted(samples) - n = len(sorted_vals) q1 = _percentile(sorted_vals, 25.0) median = _percentile(sorted_vals, 50.0) q3 = _percentile(sorted_vals, 75.0) @@ -198,12 +197,14 @@ def validate_configuration(self) -> None: standalone = ["quic-v1"] if self.transport not in valid_transports: raise ValueError( - f"Unsupported transport: {self.transport}. Supported: {valid_transports}" + f"Unsupported transport: {self.transport}. " + f"Supported: {valid_transports}" ) if self.transport not in standalone: if self.security not in valid_security: raise ValueError( - f"Unsupported security: {self.security}. Supported: {valid_security}" + f"Unsupported security: {self.security}. " + f"Supported: {valid_security}" ) if self.muxer not in valid_muxers: raise ValueError( @@ -537,7 +538,8 @@ async def run_dialer(self) -> None: sec_opt, key_pair = self.create_security_options() muxer_opt = self.create_muxer_options() - # Dialer needs listen_addrs for ws/wss so transport is registered; for quic/tcp pass [] (host.run still starts swarm/nursery) + # Dialer needs listen_addrs for ws/wss so transport is registered; + # for quic/tcp pass [] (host.run still starts swarm/nursery) dialer_listen_addrs = ( self.create_listen_addresses(0) if self.transport in ["ws", "wss"] else None ) @@ -558,7 +560,8 @@ async def run_dialer(self) -> None: self.perf_service = PerfService(self.host) await self.perf_service.start() - # Must run host inside host.run() so swarm/nursery are active (required for connect and QUIC) + # Must run host inside host.run() so swarm/nursery are active + # (required for connect and QUIC) try: async with self.host.run(listen_addrs=dialer_listen_addrs or []): # Brief delay so listener is fully listening before we dial @@ -580,7 +583,7 @@ async def run_dialer(self) -> None: ) upload_samples.append(gbps) print( - f"Upload iteration {i + 1}/{self.upload_iterations}: {gbps:.2f} Gbps", + f"Upload {i + 1}/{self.upload_iterations}: {gbps:.2f} Gbps", file=sys.stderr, ) @@ -594,7 +597,7 @@ async def run_dialer(self) -> None: ) download_samples.append(gbps) print( - f"Download iteration {i + 1}/{self.download_iterations}: {gbps:.2f} Gbps", + f"Download {i + 1}/{self.download_iterations}: {gbps:.2f} Gbps", file=sys.stderr, ) @@ -606,7 +609,7 @@ async def run_dialer(self) -> None: u = _compute_stats(upload_samples, is_latency=False) d = _compute_stats(download_samples, is_latency=False) - l = _compute_stats(latency_samples, is_latency=True) + lat = _compute_stats(latency_samples, is_latency=True) # YAML to stdout only (per write-a-perf-test-app.md) print("upload:") @@ -631,16 +634,17 @@ async def run_dialer(self) -> None: print(" unit: Gbps") print("latency:") print(f" iterations: {self.latency_iterations}") - print(f" min: {l['min']:.3f}") - print(f" q1: {l['q1']:.3f}") - print(f" median: {l['median']:.3f}") - print(f" q3: {l['q3']:.3f}") - print(f" max: {l['max']:.3f}") - print(f" outliers: {l['outliers']}") - print(f" samples: {l['samples']}") + print(f" min: {lat['min']:.3f}") + print(f" q1: {lat['q1']:.3f}") + print(f" median: {lat['median']:.3f}") + print(f" q3: {lat['q3']:.3f}") + print(f" max: {lat['max']:.3f}") + print(f" outliers: {lat['outliers']}") + print(f" samples: {lat['samples']}") print(" unit: ms") - # Graceful close: disconnect listener so it sees a clean close, then stop services + # Graceful close: disconnect listener so it sees a clean + # close, then stop services try: await self.host.disconnect(listener_peer_id) await trio.sleep(0.5) @@ -656,7 +660,8 @@ async def run_dialer(self) -> None: except Exception: pass except BaseException as e: - # Swarm/mplex may raise "Connection closed" when we disconnect; treat as success + # Swarm/mplex may raise "Connection closed" on disconnect; + # treat as success if not _is_connection_closed_error(e): raise diff --git a/libp2p/perf/perf_service.py b/libp2p/perf/perf_service.py index a680cbaa6..d794e6628 100644 --- a/libp2p/perf/perf_service.py +++ b/libp2p/perf/perf_service.py @@ -214,7 +214,8 @@ async def measure_performance( ) # Close the write side to signal we're done sending - # Note: close_write() is available on NetStream but not on INetStream interface + # Note: close_write() is available on NetStream + # but not on INetStream interface if hasattr(stream, "close_write"): await stream.close_write() # type: ignore[attr-defined] From a6b029a6953a4f784b41523eed9b5e9525ae25aa Mon Sep 17 00:00:00 2001 From: Mohan Date: Tue, 3 Feb 2026 16:04:00 +0530 Subject: [PATCH 14/27] fixed the 8bytes logic error --- libp2p/perf/perf_service.py | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/libp2p/perf/perf_service.py b/libp2p/perf/perf_service.py index d794e6628..6664ccd21 100644 --- a/libp2p/perf/perf_service.py +++ b/libp2p/perf/perf_service.py @@ -89,30 +89,34 @@ async def _handle_message(self, stream: INetStream) -> None: from the first 8 bytes, then sends that many bytes back. """ try: - bytes_to_send_back: int | None = None + # Read exactly 8 bytes for the header (handle TCP fragmentation) + header = b"" + while len(header) < 8: + try: + chunk = await stream.read(8 - len(header)) + if not chunk: + logger.error("Stream closed before header was fully received") + await stream.reset() + return + header += chunk + except Exception: + logger.error("Error reading header") + await stream.reset() + return + + # Parse the big-endian unsigned 64-bit integer + bytes_to_send_back = struct.unpack(">Q", header)[0] + logger.debug("Received request to send back %d bytes", bytes_to_send_back) - # Read all incoming data + # Read remaining data until EOF (client closes write side) while True: try: data = await stream.read(self._write_block_size) if not data: break - - # First 8 bytes contain the number of bytes to send back - if bytes_to_send_back is None and len(data) >= 8: - # Big-endian unsigned 64-bit integer - bytes_to_send_back = struct.unpack(">Q", data[:8])[0] - logger.debug( - "Received request to send back %d bytes", bytes_to_send_back - ) except Exception: break - if bytes_to_send_back is None: - logger.error("bytes_to_send_back was not set") - await stream.reset() - return - # Send back the requested number of bytes while bytes_to_send_back > 0: to_send = min(self._write_block_size, bytes_to_send_back) From dad99d49e50289461560b6f2a016fc9ddb63a432 Mon Sep 17 00:00:00 2001 From: Mohan Date: Tue, 3 Feb 2026 22:20:38 +0530 Subject: [PATCH 15/27] tests for the perf --- tests/core/perf/__init__.py | 1 + tests/core/perf/conftest.py | 71 +++++++++++++ tests/core/perf/test_perf_client.py | 148 ++++++++++++++++++++++++++++ tests/core/perf/test_perf_errors.py | 146 +++++++++++++++++++++++++++ tests/core/perf/test_perf_server.py | 83 ++++++++++++++++ 5 files changed, 449 insertions(+) create mode 100644 tests/core/perf/__init__.py create mode 100644 tests/core/perf/conftest.py create mode 100644 tests/core/perf/test_perf_client.py create mode 100644 tests/core/perf/test_perf_errors.py create mode 100644 tests/core/perf/test_perf_server.py diff --git a/tests/core/perf/__init__.py b/tests/core/perf/__init__.py new file mode 100644 index 000000000..ecd9f41d6 --- /dev/null +++ b/tests/core/perf/__init__.py @@ -0,0 +1 @@ +# Empty init file for perf tests package. diff --git a/tests/core/perf/conftest.py b/tests/core/perf/conftest.py new file mode 100644 index 000000000..6cdc3f5b6 --- /dev/null +++ b/tests/core/perf/conftest.py @@ -0,0 +1,71 @@ +"""Shared fixtures and mocks for perf protocol tests.""" + +import pytest + +from libp2p.perf import PerfService + + +class MockNetStream: + """Mock stream for testing server handler in isolation.""" + + def __init__( + self, + data_to_read: bytes, + *, + raise_on_read: bool = False, + close_after_bytes: int | None = None, + raise_after_bytes: int | None = None, + ): + self._read_buffer = data_to_read + self._write_buffer = b"" + self._reset_called = False + self._closed = False + self._raise_on_read = raise_on_read + self._close_after_bytes = close_after_bytes + self._raise_after_bytes = raise_after_bytes + self._bytes_read = 0 + + async def read(self, n: int) -> bytes: + if self._raise_on_read: + raise ConnectionError("Mock read error") + + if self._raise_after_bytes is not None: + if self._bytes_read >= self._raise_after_bytes: + raise ConnectionError("Mock read error after bytes") + + if self._close_after_bytes is not None: + if self._bytes_read >= self._close_after_bytes: + return b"" + + chunk = self._read_buffer[:n] + self._read_buffer = self._read_buffer[n:] + self._bytes_read += len(chunk) + return chunk + + async def write(self, data: bytes) -> None: + self._write_buffer += data + + async def reset(self) -> None: + self._reset_called = True + + async def close(self) -> None: + self._closed = True + + +class MockHost: + """Minimal mock host for PerfService initialization.""" + + def set_stream_handler(self, protocol, handler): + pass + + +@pytest.fixture +def mock_host(): + """Provide a mock host for testing.""" + return MockHost() + + +@pytest.fixture +def perf_service(mock_host): + """Provide a PerfService instance for testing.""" + return PerfService(mock_host) diff --git a/tests/core/perf/test_perf_client.py b/tests/core/perf/test_perf_client.py new file mode 100644 index 000000000..82ec80496 --- /dev/null +++ b/tests/core/perf/test_perf_client.py @@ -0,0 +1,148 @@ +"""Tests for perf protocol client-side functionality.""" + +import struct + +import pytest + +from libp2p.perf import PerfService +from tests.utils.factories import host_pair_factory + + +@pytest.mark.trio +async def test_client_upload_small_size(security_protocol): + """Test client upload with small sizes: 0, 1, 100, 1024 bytes.""" + async with host_pair_factory(security_protocol=security_protocol) as ( + host_a, + host_b, + ): + server = PerfService(host_a) + client = PerfService(host_b) + await server.start() + + # Test various small upload sizes + test_sizes = [0, 1, 100, 1024] + + for send_bytes in test_sizes: + recv_bytes = 0 # No download for this test + results = [] + async for output in client.measure_performance( + host_a.get_addrs()[0].encapsulate(f"/p2p/{host_a.get_id()}"), + send_bytes, + recv_bytes, + ): + results.append(output) + + final = results[-1] + assert final["type"] == "final" + assert final["upload_bytes"] == send_bytes, ( + f"Expected upload_bytes={send_bytes}, got {final['upload_bytes']}" + ) + + +@pytest.mark.trio +async def test_client_download_small_size(security_protocol): + """Test client download with small sizes: 0, 1, 100, 1024 bytes.""" + async with host_pair_factory(security_protocol=security_protocol) as ( + host_a, + host_b, + ): + server = PerfService(host_a) + client = PerfService(host_b) + await server.start() + + # Test various small download sizes + test_sizes = [0, 1, 100, 1024] + + for recv_bytes in test_sizes: + send_bytes = 8 # Minimal upload (just header size worth) + results = [] + async for output in client.measure_performance( + host_a.get_addrs()[0].encapsulate(f"/p2p/{host_a.get_id()}"), + send_bytes, + recv_bytes, + ): + results.append(output) + + final = results[-1] + assert final["type"] == "final" + assert final["download_bytes"] == recv_bytes, ( + f"Expected download_bytes={recv_bytes}, got {final['download_bytes']}" + ) + + +@pytest.mark.trio +async def test_client_asymmetric_upload_download(security_protocol): + """Test asymmetric transfer: upload 100 bytes, download 1000 bytes.""" + async with host_pair_factory(security_protocol=security_protocol) as ( + host_a, + host_b, + ): + server = PerfService(host_a) + client = PerfService(host_b) + await server.start() + + send_bytes = 100 + recv_bytes = 1000 + + results = [] + async for output in client.measure_performance( + host_a.get_addrs()[0].encapsulate(f"/p2p/{host_a.get_id()}"), + send_bytes, + recv_bytes, + ): + results.append(output) + + final = results[-1] + assert final["type"] == "final" + assert final["upload_bytes"] == send_bytes + assert final["download_bytes"] == recv_bytes + + +@pytest.mark.trio +async def test_client_final_output_correct(security_protocol): + """Verify final PerfOutput has correct structure and byte counts.""" + async with host_pair_factory(security_protocol=security_protocol) as ( + host_a, + host_b, + ): + server = PerfService(host_a) + client = PerfService(host_b) + await server.start() + + send_bytes = 512 + recv_bytes = 256 + + results = [] + async for output in client.measure_performance( + host_a.get_addrs()[0].encapsulate(f"/p2p/{host_a.get_id()}"), + send_bytes, + recv_bytes, + ): + results.append(output) + + # Verify we got at least a final result + assert len(results) >= 1 + + final = results[-1] + assert final["type"] == "final" + assert final["upload_bytes"] == send_bytes + assert final["download_bytes"] == recv_bytes + assert final["time_seconds"] > 0 + + +def test_client_header_format(): + """Verify the 8-byte header format is correct (unit test).""" + # Test that the header format used by the client is correct + # The client uses struct.pack(">Q", recv_bytes) to create the header + + test_values = [0, 1, 100, 1024, 65536, 1024 * 1024] + + for recv_bytes in test_values: + header = struct.pack(">Q", recv_bytes) + + # Verify header is exactly 8 bytes + assert len(header) == 8 + + # Verify it can be correctly parsed back + parsed = struct.unpack(">Q", header)[0] + assert parsed == recv_bytes diff --git a/tests/core/perf/test_perf_errors.py b/tests/core/perf/test_perf_errors.py new file mode 100644 index 000000000..29b60f76c --- /dev/null +++ b/tests/core/perf/test_perf_errors.py @@ -0,0 +1,146 @@ +"""Tests for perf protocol error handling paths.""" + +import struct + +import pytest + +from libp2p.custom_types import TProtocol +from libp2p.perf import PerfService +from tests.utils.factories import host_pair_factory + +from .conftest import MockNetStream + +# ============================================================================= +# Server Error Path Tests +# ============================================================================= + + +@pytest.mark.trio +async def test_server_stream_reset_before_header(perf_service): + """Server handles stream reset/close before receiving any header bytes.""" + # Stream closes immediately with no data + stream = MockNetStream(b"", close_after_bytes=0) + await perf_service._handle_message(stream) + + # Server should reset the stream on error + assert stream._reset_called is True + + +@pytest.mark.trio +async def test_server_incomplete_header(perf_service): + """Server handles incomplete header (< 8 bytes received before stream closes).""" + # Only 4 bytes of header, then stream closes + incomplete_header = b"\x00\x00\x00\x01" # Only 4 bytes + stream = MockNetStream(incomplete_header) + await perf_service._handle_message(stream) + + # Server should reset the stream when header is incomplete + assert stream._reset_called is True + + +@pytest.mark.trio +async def test_server_error_during_read(perf_service): + """Server properly resets stream on read errors.""" + # Stream will raise an error on first read + stream = MockNetStream(b"", raise_on_read=True) + await perf_service._handle_message(stream) + + # Server should reset the stream on read error + assert stream._reset_called is True + + +@pytest.mark.trio +async def test_server_read_error_after_partial_header(perf_service): + """Server resets stream when read error occurs after partial header.""" + # Provide 4 bytes, then error on next read + partial_header = b"\x00\x00\x00\x01" + stream = MockNetStream(partial_header, raise_after_bytes=4) + await perf_service._handle_message(stream) + + # Server should reset the stream + assert stream._reset_called is True + + +# ============================================================================= +# Client Error Path Tests +# ============================================================================= + + +@pytest.mark.trio +async def test_client_stream_reset_mid_upload(security_protocol): + """Client handles stream reset during upload phase.""" + async with host_pair_factory(security_protocol=security_protocol) as ( + host_a, + host_b, + ): + # Server that resets stream after reading some bytes + async def resetting_server_handler(stream): + # Read a bit then reset + await stream.read(100) + await stream.reset() + + host_a.set_stream_handler(TProtocol("/perf/1.0.0"), resetting_server_handler) + + client = PerfService(host_b) + + recv_bytes = 1000 + send_bytes = 10000 # Large upload to ensure we hit the reset + + # Should raise an exception due to stream reset + with pytest.raises(Exception): + async for _ in client.measure_performance( + host_a.get_addrs()[0].encapsulate(f"/p2p/{host_a.get_id()}"), + send_bytes, + recv_bytes, + ): + pass + + +@pytest.mark.trio +async def test_client_byte_count_mismatch(security_protocol): + """Client raises ValueError when server sends fewer bytes than requested.""" + from libp2p.network.stream.exceptions import StreamEOF + + async with host_pair_factory(security_protocol=security_protocol) as ( + host_a, + host_b, + ): + # Server that sends fewer bytes than the client expects + async def short_response_handler(stream): + # Read the 8-byte header to get requested bytes + header = await stream.read(8) + requested_bytes = struct.unpack(">Q", header)[0] + + # Drain the upload data (handle EOF when client finishes sending) + try: + while True: + data = await stream.read(4096) + if not data: + break + except StreamEOF: + pass # Expected when client closes write side + + # Intentionally send fewer bytes than requested + bytes_to_send = max(0, requested_bytes - 100) + await stream.write(b"\x00" * bytes_to_send) + await stream.close() + + host_a.set_stream_handler(TProtocol("/perf/1.0.0"), short_response_handler) + + client = PerfService(host_b) + + recv_bytes = 1000 # Request 1000 bytes + send_bytes = 100 + + # Should raise ValueError due to byte count mismatch + with pytest.raises(ValueError) as exc_info: + async for _ in client.measure_performance( + host_a.get_addrs()[0].encapsulate(f"/p2p/{host_a.get_id()}"), + send_bytes, + recv_bytes, + ): + pass + + # Verify the error message contains expected information + error_msg = str(exc_info.value) + assert "Expected to receive" in error_msg or "expected" in error_msg.lower() diff --git a/tests/core/perf/test_perf_server.py b/tests/core/perf/test_perf_server.py new file mode 100644 index 000000000..989389da4 --- /dev/null +++ b/tests/core/perf/test_perf_server.py @@ -0,0 +1,83 @@ +"""Tests for perf protocol server-side functionality.""" + +import struct + +import pytest + +from .conftest import MockNetStream + + +@pytest.mark.trio +async def test_parse_valid_8_byte_header(perf_service): + """Verify server correctly parses a valid 8-byte big-endian header.""" + # Create header requesting 1024 bytes back + recv_bytes = 1024 + header = struct.pack(">Q", recv_bytes) + # Add some upload data after header + upload_data = b"x" * 100 + + stream = MockNetStream(header + upload_data) + await perf_service._handle_message(stream) + + # Server should have written exactly 1024 bytes back + assert len(stream._write_buffer) == recv_bytes + assert stream._closed is True + assert stream._reset_called is False + + +@pytest.mark.trio +async def test_server_extracts_response_length(perf_service): + """Verify server extracts correct bytes_to_send_back value from header.""" + # Test with specific value: 5000 bytes + recv_bytes = 5000 + header = struct.pack(">Q", recv_bytes) + + stream = MockNetStream(header) + await perf_service._handle_message(stream) + + # Server should send exactly 5000 bytes + assert len(stream._write_buffer) == recv_bytes + + +@pytest.mark.trio +async def test_server_handles_zero_response_length(perf_service): + """Edge case: header requests 0 bytes back.""" + recv_bytes = 0 + header = struct.pack(">Q", recv_bytes) + upload_data = b"y" * 50 + + stream = MockNetStream(header + upload_data) + await perf_service._handle_message(stream) + + # Server should write 0 bytes back + assert len(stream._write_buffer) == 0 + assert stream._closed is True + + +@pytest.mark.trio +async def test_server_handles_large_response_length(perf_service): + """Edge case: header requests large bytes back (1MB).""" + recv_bytes = 1024 * 1024 # 1MB + header = struct.pack(">Q", recv_bytes) + + stream = MockNetStream(header) + await perf_service._handle_message(stream) + + # Server should send exactly 1MB + assert len(stream._write_buffer) == recv_bytes + assert stream._closed is True + + +@pytest.mark.trio +async def test_server_sends_correct_byte_count(perf_service): + """Verify server sends exactly the requested number of bytes.""" + test_cases = [1, 100, 1024, 65536] + + for recv_bytes in test_cases: + header = struct.pack(">Q", recv_bytes) + stream = MockNetStream(header) + await perf_service._handle_message(stream) + + assert len(stream._write_buffer) == recv_bytes, ( + f"Expected {recv_bytes} bytes, got {len(stream._write_buffer)}" + ) From c0298daa351a952ea483c867d107a4a7f91e9199 Mon Sep 17 00:00:00 2001 From: Mohan Date: Mon, 9 Feb 2026 11:28:17 +0530 Subject: [PATCH 16/27] changes the newsfragment name to the issue number --- newsfragments/{1176.feature.rst => 1169.feature.rst} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename newsfragments/{1176.feature.rst => 1169.feature.rst} (100%) diff --git a/newsfragments/1176.feature.rst b/newsfragments/1169.feature.rst similarity index 100% rename from newsfragments/1176.feature.rst rename to newsfragments/1169.feature.rst From a4b5c2835c5a3fd47aa2a274f8d6e5de088017ce Mon Sep 17 00:00:00 2001 From: Mohan Date: Mon, 9 Feb 2026 13:30:43 +0530 Subject: [PATCH 17/27] fixed the lint errors in init --- libp2p/perf/perf_service.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/libp2p/perf/perf_service.py b/libp2p/perf/perf_service.py index 6664ccd21..469bca365 100644 --- a/libp2p/perf/perf_service.py +++ b/libp2p/perf/perf_service.py @@ -47,18 +47,19 @@ def __init__(self, host: IHost, init: PerfInit | None = None) -> None: Initialization options for the service. """ - if init is None: - init = {} + init_opts: PerfInit = init if init is not None else {} self._host = host self._started = False - self._protocol = TProtocol(init.get("protocol_name", PROTOCOL_NAME)) - self._write_block_size = init.get("write_block_size", WRITE_BLOCK_SIZE) - self._max_inbound_streams = init.get("max_inbound_streams", MAX_INBOUND_STREAMS) - self._max_outbound_streams = init.get( + self._protocol = TProtocol(init_opts.get("protocol_name", PROTOCOL_NAME)) + self._write_block_size = init_opts.get("write_block_size", WRITE_BLOCK_SIZE) + self._max_inbound_streams = init_opts.get( + "max_inbound_streams", MAX_INBOUND_STREAMS + ) + self._max_outbound_streams = init_opts.get( "max_outbound_streams", MAX_OUTBOUND_STREAMS ) - self._run_on_limited_connection = init.get( + self._run_on_limited_connection = init_opts.get( "run_on_limited_connection", RUN_ON_LIMITED_CONNECTION ) @@ -90,7 +91,7 @@ async def _handle_message(self, stream: INetStream) -> None: """ try: # Read exactly 8 bytes for the header (handle TCP fragmentation) - header = b"" + header: bytes = b"" while len(header) < 8: try: chunk = await stream.read(8 - len(header)) @@ -156,8 +157,7 @@ async def measure_performance( Progress reports during the transfer, with a final summary at the end. """ - if options is None: - options = {} + opts: PerfOptions = options if options is not None else {} initial_start_time = time.time() last_reported_time = time.time() From ccb0c5825e7e4ecb05f544ad78c1beb44e30e4b1 Mon Sep 17 00:00:00 2001 From: Mohan Date: Mon, 9 Feb 2026 13:40:47 +0530 Subject: [PATCH 18/27] added the docstring about the high data to send back and resouce exhaustion --- libp2p/perf/perf_service.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/libp2p/perf/perf_service.py b/libp2p/perf/perf_service.py index 469bca365..dbb70fb64 100644 --- a/libp2p/perf/perf_service.py +++ b/libp2p/perf/perf_service.py @@ -1,6 +1,13 @@ """ Perf protocol service implementation. Spec: https://github.com/libp2p/specs/blob/master/perf/perf.md + +Note: + This service is designed for benchmarking and performance testing. + The server accepts any uint64 value for bytes to send back, which + is intentional to support high-volume stress testing. This service + should only be enabled in trusted environments or test networks. + """ from collections.abc import AsyncIterator From d7392552d9addbc9795f6bcfb45bcec40922e5d6 Mon Sep 17 00:00:00 2001 From: Mohan Date: Tue, 10 Feb 2026 13:43:33 +0530 Subject: [PATCH 19/27] fixed the perfoptions and header type issue --- libp2p/perf/__init__.py | 2 -- libp2p/perf/perf_service.py | 8 ++------ libp2p/perf/types.py | 6 ------ 3 files changed, 2 insertions(+), 14 deletions(-) diff --git a/libp2p/perf/__init__.py b/libp2p/perf/__init__.py index 67df485a0..cbd3252bc 100644 --- a/libp2p/perf/__init__.py +++ b/libp2p/perf/__init__.py @@ -15,7 +15,6 @@ from .types import ( PerfComponents, PerfInit, - PerfOptions, PerfOutput, ) from libp2p.abc import IPerf @@ -31,7 +30,6 @@ # Types "PerfOutput", "PerfInit", - "PerfOptions", "PerfComponents", # Interface "IPerf", diff --git a/libp2p/perf/perf_service.py b/libp2p/perf/perf_service.py index dbb70fb64..01a32ad9e 100644 --- a/libp2p/perf/perf_service.py +++ b/libp2p/perf/perf_service.py @@ -29,7 +29,7 @@ RUN_ON_LIMITED_CONNECTION, WRITE_BLOCK_SIZE, ) -from .types import PerfInit, PerfOptions, PerfOutput +from .types import PerfInit, PerfOutput logger = logging.getLogger(__name__) @@ -113,6 +113,7 @@ async def _handle_message(self, stream: INetStream) -> None: return # Parse the big-endian unsigned 64-bit integer + header = bytes(header) bytes_to_send_back = struct.unpack(">Q", header)[0] logger.debug("Received request to send back %d bytes", bytes_to_send_back) @@ -142,7 +143,6 @@ async def measure_performance( multiaddr: Multiaddr, send_bytes: int, recv_bytes: int, - options: PerfOptions | None = None, ) -> AsyncIterator[PerfOutput]: """ Measure transfer performance to a remote peer. @@ -155,8 +155,6 @@ async def measure_performance( Number of bytes to upload to the remote peer. recv_bytes : int Number of bytes to request the remote peer to send back. - options : PerfOptions, optional - Options for the performance run. Yields ------ @@ -164,8 +162,6 @@ async def measure_performance( Progress reports during the transfer, with a final summary at the end. """ - opts: PerfOptions = options if options is not None else {} - initial_start_time = time.time() last_reported_time = time.time() diff --git a/libp2p/perf/types.py b/libp2p/perf/types.py index 4e25803ad..431384e2d 100644 --- a/libp2p/perf/types.py +++ b/libp2p/perf/types.py @@ -33,12 +33,6 @@ class PerfInit(TypedDict, total=False): write_block_size: int # Default: 65536 (64KB) -class PerfOptions(TypedDict, total=False): - """Options for a performance measurement run.""" - - reuse_existing_connection: bool # Default: False - - class PerfComponents(TypedDict): """Components required by the perf service.""" From 8493989ca08911d133e8088fd45f7ab8ff5448e6 Mon Sep 17 00:00:00 2001 From: Mohan Date: Thu, 12 Feb 2026 23:44:18 +0530 Subject: [PATCH 20/27] fixed the lint issue by changing the type of chunk --- libp2p/perf/perf_service.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/libp2p/perf/perf_service.py b/libp2p/perf/perf_service.py index 01a32ad9e..6786f5a3a 100644 --- a/libp2p/perf/perf_service.py +++ b/libp2p/perf/perf_service.py @@ -101,7 +101,7 @@ async def _handle_message(self, stream: INetStream) -> None: header: bytes = b"" while len(header) < 8: try: - chunk = await stream.read(8 - len(header)) + chunk: bytes = await stream.read(8 - len(header)) if not chunk: logger.error("Stream closed before header was fully received") await stream.reset() @@ -113,7 +113,6 @@ async def _handle_message(self, stream: INetStream) -> None: return # Parse the big-endian unsigned 64-bit integer - header = bytes(header) bytes_to_send_back = struct.unpack(">Q", header)[0] logger.debug("Received request to send back %d bytes", bytes_to_send_back) From d0cbd229a64cf36ad63cd5dd40c0263261fb71b8 Mon Sep 17 00:00:00 2001 From: Mohan Date: Fri, 13 Feb 2026 01:45:59 +0530 Subject: [PATCH 21/27] fixed teh lint errors by the ignore --- libp2p/perf/perf_service.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libp2p/perf/perf_service.py b/libp2p/perf/perf_service.py index 6786f5a3a..245397912 100644 --- a/libp2p/perf/perf_service.py +++ b/libp2p/perf/perf_service.py @@ -14,6 +14,7 @@ import logging import struct import time +from typing import cast from multiaddr import Multiaddr @@ -101,7 +102,7 @@ async def _handle_message(self, stream: INetStream) -> None: header: bytes = b"" while len(header) < 8: try: - chunk: bytes = await stream.read(8 - len(header)) + chunk = cast(bytes, await stream.read(8 - len(header))) # type: ignore[redundant-cast] if not chunk: logger.error("Stream closed before header was fully received") await stream.reset() From 08372ca6cf67bcd5aba8cc97f5e83dabbb339e22 Mon Sep 17 00:00:00 2001 From: Mohan Date: Tue, 17 Feb 2026 00:24:39 +0530 Subject: [PATCH 22/27] used the .started variable in the handle message function --- libp2p/perf/perf_service.py | 8 ++++++++ tests/core/perf/conftest.py | 6 ++++-- tests/core/perf/test_perf_errors.py | 11 +++++++++++ 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/libp2p/perf/perf_service.py b/libp2p/perf/perf_service.py index 245397912..88474aa52 100644 --- a/libp2p/perf/perf_service.py +++ b/libp2p/perf/perf_service.py @@ -97,6 +97,14 @@ async def _handle_message(self, stream: INetStream) -> None: Reads data from the stream, extracts the number of bytes to send back from the first 8 bytes, then sends that many bytes back. """ + if not self._started: + logger.debug("Perf service received stream while stopped; resetting") + try: + await stream.reset() + except Exception: + logger.debug("Failed to reset stopped perf stream", exc_info=True) + return + try: # Read exactly 8 bytes for the header (handle TCP fragmentation) header: bytes = b"" diff --git a/tests/core/perf/conftest.py b/tests/core/perf/conftest.py index 6cdc3f5b6..631c11f7b 100644 --- a/tests/core/perf/conftest.py +++ b/tests/core/perf/conftest.py @@ -66,6 +66,8 @@ def mock_host(): @pytest.fixture -def perf_service(mock_host): +async def perf_service(mock_host): """Provide a PerfService instance for testing.""" - return PerfService(mock_host) + service = PerfService(mock_host) + await service.start() + return service diff --git a/tests/core/perf/test_perf_errors.py b/tests/core/perf/test_perf_errors.py index 29b60f76c..a19cac330 100644 --- a/tests/core/perf/test_perf_errors.py +++ b/tests/core/perf/test_perf_errors.py @@ -61,6 +61,17 @@ async def test_server_read_error_after_partial_header(perf_service): assert stream._reset_called is True +@pytest.mark.trio +async def test_server_resets_stream_when_service_stopped(perf_service): + """Stopped service should reject inbound perf streams.""" + await perf_service.stop() + stream = MockNetStream(struct.pack(">Q", 64) + b"x" * 32) + await perf_service._handle_message(stream) + + assert stream._reset_called is True + assert stream._write_buffer == b"" + + # ============================================================================= # Client Error Path Tests # ============================================================================= From a2a922ad5acdb065e72e7f272e5b7c9c71a35598 Mon Sep 17 00:00:00 2001 From: Mohan Date: Tue, 17 Feb 2026 01:34:21 +0530 Subject: [PATCH 23/27] removed the configs that are not implemented in the set_stream_handler function --- libp2p/abc.py | 2 +- libp2p/perf/__init__.py | 6 ------ libp2p/perf/constants.py | 3 --- libp2p/perf/perf_service.py | 12 ------------ libp2p/perf/types.py | 3 --- 5 files changed, 1 insertion(+), 25 deletions(-) diff --git a/libp2p/abc.py b/libp2p/abc.py index 39a08d5a7..e187d90eb 100644 --- a/libp2p/abc.py +++ b/libp2p/abc.py @@ -1532,7 +1532,7 @@ async def upgrade_outbound_raw_conn( Parameters ---------- - raw_conn : IRawConnection + raw_coset_strenn : IRawConnection The raw connection to upgrade. peer_id : ID The peer to which this connection is established. diff --git a/libp2p/perf/__init__.py b/libp2p/perf/__init__.py index cbd3252bc..2d0ffde5e 100644 --- a/libp2p/perf/__init__.py +++ b/libp2p/perf/__init__.py @@ -6,10 +6,7 @@ """ from .constants import ( - MAX_INBOUND_STREAMS, - MAX_OUTBOUND_STREAMS, PROTOCOL_NAME, - RUN_ON_LIMITED_CONNECTION, WRITE_BLOCK_SIZE, ) from .types import ( @@ -24,9 +21,6 @@ # Constants "PROTOCOL_NAME", "WRITE_BLOCK_SIZE", - "MAX_INBOUND_STREAMS", - "MAX_OUTBOUND_STREAMS", - "RUN_ON_LIMITED_CONNECTION", # Types "PerfOutput", "PerfInit", diff --git a/libp2p/perf/constants.py b/libp2p/perf/constants.py index f4068d43d..58600422a 100644 --- a/libp2p/perf/constants.py +++ b/libp2p/perf/constants.py @@ -3,6 +3,3 @@ PROTOCOL_NAME = "/perf/1.0.0" WRITE_BLOCK_SIZE = 65500 -MAX_INBOUND_STREAMS = 1 -MAX_OUTBOUND_STREAMS = 1 -RUN_ON_LIMITED_CONNECTION = False diff --git a/libp2p/perf/perf_service.py b/libp2p/perf/perf_service.py index 88474aa52..55798572b 100644 --- a/libp2p/perf/perf_service.py +++ b/libp2p/perf/perf_service.py @@ -24,10 +24,7 @@ from libp2p.peer.peerinfo import PeerInfo from .constants import ( - MAX_INBOUND_STREAMS, - MAX_OUTBOUND_STREAMS, PROTOCOL_NAME, - RUN_ON_LIMITED_CONNECTION, WRITE_BLOCK_SIZE, ) from .types import PerfInit, PerfOutput @@ -61,15 +58,6 @@ def __init__(self, host: IHost, init: PerfInit | None = None) -> None: self._started = False self._protocol = TProtocol(init_opts.get("protocol_name", PROTOCOL_NAME)) self._write_block_size = init_opts.get("write_block_size", WRITE_BLOCK_SIZE) - self._max_inbound_streams = init_opts.get( - "max_inbound_streams", MAX_INBOUND_STREAMS - ) - self._max_outbound_streams = init_opts.get( - "max_outbound_streams", MAX_OUTBOUND_STREAMS - ) - self._run_on_limited_connection = init_opts.get( - "run_on_limited_connection", RUN_ON_LIMITED_CONNECTION - ) # Pre-allocate buffer for sending data self._buf = bytes(self._write_block_size) diff --git a/libp2p/perf/types.py b/libp2p/perf/types.py index 431384e2d..11e117daf 100644 --- a/libp2p/perf/types.py +++ b/libp2p/perf/types.py @@ -27,9 +27,6 @@ class PerfInit(TypedDict, total=False): """Initialization options for the perf service.""" protocol_name: str - max_inbound_streams: int - max_outbound_streams: int - run_on_limited_connection: bool write_block_size: int # Default: 65536 (64KB) From 5a306b31538d2d82ffc5cfd7fb0bb2bf117ec1dd Mon Sep 17 00:00:00 2001 From: Mohan Date: Tue, 17 Feb 2026 10:35:22 +0530 Subject: [PATCH 24/27] fixed the typo --- libp2p/abc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2p/abc.py b/libp2p/abc.py index e187d90eb..39a08d5a7 100644 --- a/libp2p/abc.py +++ b/libp2p/abc.py @@ -1532,7 +1532,7 @@ async def upgrade_outbound_raw_conn( Parameters ---------- - raw_coset_strenn : IRawConnection + raw_conn : IRawConnection The raw connection to upgrade. peer_id : ID The peer to which this connection is established. From 5aecd09f4aee498260de57e4ef15dc369c55cc5e Mon Sep 17 00:00:00 2001 From: Mohan Date: Tue, 17 Feb 2026 10:39:18 +0530 Subject: [PATCH 25/27] removed the config option detail from the newsfragment --- newsfragments/1169.feature.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/newsfragments/1169.feature.rst b/newsfragments/1169.feature.rst index 2b1ac8aea..6b935ccc8 100644 --- a/newsfragments/1169.feature.rst +++ b/newsfragments/1169.feature.rst @@ -1,7 +1,7 @@ Added implementation of the libp2p perf protocol for measuring transfer performance. The perf protocol allows benchmarking data transfer speeds between libp2p nodes. It includes: -- ``PerfService`` class with configurable inbound/outbound stream limits +- ``PerfService`` class for perf protocol server/client operations - ``measure_performance()`` method to measure upload/download throughput to a remote peer - Server-side handling for responding to perf protocol requests - Detailed metrics output including latency, upload/download times, and throughput in bytes/second From a5acc0388191d53086c936ba2225e8bc2335d80a Mon Sep 17 00:00:00 2001 From: acul71 <34693171+acul71@users.noreply.github.com> Date: Wed, 25 Feb 2026 04:01:45 +0100 Subject: [PATCH 26/27] fix: resolve perf interop addr rebuild and align block size Use multiaddr items when rebuilding loopback-rewritten addresses in interop perf so protocol/value pairs are handled correctly, and align perf write block size default to 64KiB for js-libp2p parity. Co-authored-by: Cursor --- interop/perf/perf_test.py | 6 +++--- libp2p/perf/constants.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/interop/perf/perf_test.py b/interop/perf/perf_test.py index 7216b911f..ae376b2d5 100644 --- a/interop/perf/perf_test.py +++ b/interop/perf/perf_test.py @@ -426,12 +426,12 @@ def _replace_loopback_ip(self, addr: multiaddr.Multiaddr) -> str: is_ipv6 = "ip6" in names parts = [f"/ip6/{actual}" if is_ipv6 else f"/ip4/{actual}"] found = False - for p in addr.protocols(): - if p.name in ["ip4", "ip6"]: + for proto, value in addr.items(): + if proto.name in ["ip4", "ip6"]: found = True continue if found: - parts.append(f"/{p.name}/{p.value}" if p.value else f"/{p.name}") + parts.append(f"/{proto.name}/{value}" if value else f"/{proto.name}") return str(multiaddr.Multiaddr("".join(parts))) def _get_publishable_address(self, addresses: list[multiaddr.Multiaddr]) -> str: diff --git a/libp2p/perf/constants.py b/libp2p/perf/constants.py index 58600422a..a801967de 100644 --- a/libp2p/perf/constants.py +++ b/libp2p/perf/constants.py @@ -2,4 +2,4 @@ # https://github.com/libp2p/specs/blob/master/perf/perf.md PROTOCOL_NAME = "/perf/1.0.0" -WRITE_BLOCK_SIZE = 65500 +WRITE_BLOCK_SIZE = 65536 From b43d971a2b5784e2311424a07214fbc1ce0b6928 Mon Sep 17 00:00:00 2001 From: acul71 <34693171+acul71@users.noreply.github.com> Date: Sat, 28 Feb 2026 03:53:54 +0100 Subject: [PATCH 27/27] doc: interop/ dir README.md for unified-test new repo --- interop/README.md | 86 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 interop/README.md diff --git a/interop/README.md b/interop/README.md new file mode 100644 index 000000000..512a9ee86 --- /dev/null +++ b/interop/README.md @@ -0,0 +1,86 @@ +# Interop Directory + +This directory contains **interoperability and performance test material** for py-libp2p, designed to integrate with the [unified-testing framework](https://github.com/libp2p/unified-testing) (successor to libp2p/test-plans). + +## Purpose + +The `interop/` directory houses the Docker images, test scripts, and configuration needed to run py-libp2p in cross-implementation tests. The unified-testing convention allows this material to live in the implementation repository itself, which: + +- Enables **local testing** against other libp2p implementations (Rust, Go, JS, .NET) without syncing between repos +- Serves as an **example** for setting up Docker-based protocol tests with py-libp2p +- Keeps test implementations versioned and developed alongside the library + +## Directory Layout + +``` +interop/ +├── perf/ # Performance (throughput, latency) tests +│ ├── Dockerfile # Builds image for perf protocol testing +│ ├── perf_test.py # Test application (listener + dialer) +│ └── pyproject.toml # Dependencies (libp2p, redis, etc.) +└── transport/ # Transport interoperability tests + ├── Dockerfile + ├── ping_test.py + └── pyproject.toml +``` + +Each subdirectory corresponds to a **test type** in the unified-testing framework. + +## How It Integrates with Unified-Testing + +The unified-testing framework (see `unified-testing/docs/`) runs tests by: + +1. **Building Docker images** – Uses `images.yaml` to define implementations. For py-libp2p, the build uses `source.type: github` (or `local`) pointing at this repo, with a `dockerfile` path such as `interop/perf/Dockerfile`. +1. **Running tests** – The framework starts **listener** and **dialer** containers on a shared network, coordinates them via **Redis**, and collects results. + +### Perf Tests (`interop/perf/`) + +Perf tests measure: + +- **Upload throughput** – How fast the dialer sends data to the listener +- **Download throughput** – How fast the dialer receives data from the listener +- **Latency** – Round-trip time for small messages + +The test app (`perf_test.py`) implements the [libp2p perf protocol](https://github.com/libp2p/specs/blob/master/perf/perf.md) (`/perf/1.0.0`) and follows [write-a-perf-test-app.md](https://github.com/libp2p/unified-testing/blob/master/docs/write-a-perf-test-app.md): + +- Reads config from environment variables (`IS_DIALER`, `REDIS_ADDR`, `TEST_KEY`, `TRANSPORT`, etc.) +- Listener publishes its multiaddr to Redis; dialer polls and connects +- Dialer runs upload/download/latency iterations and outputs YAML results to stdout +- All logging goes to stderr (stdout is reserved for results) + +### Transport Tests (`interop/transport/`) + +Transport tests verify that py-libp2p can establish connections and exchange protocols with other implementations over various transport, secure channel, and muxer combinations (TCP, QUIC, WebSocket, Noise, TLS, yamux, mplex). + +## Build Context + +When building from the py-libp2p repo: + +- **Build context** = repository root (not `interop/perf/` or `interop/transport/`) +- The Dockerfile uses `COPY . /app/py-libp2p` to include the full libp2p source, then copies the test script and installs dependencies so the test app uses the in-repo libp2p. + +This ensures each Docker image is built against the exact py-libp2p version in the repo or specified commit. + +## Running Locally + +To run interop tests, use the unified-testing framework: + +```bash +# From the unified-testing repo +cd perf +./run.sh --impl-select "python-v0.x" # When python is in images.yaml +``` + +To run the perf test script directly (e.g. for development), see `examples/perf/perf_example.py` and the [perf protocol documentation](../../libp2p/perf/). + +## Relationship to CI + +- Code in `interop/` is **not** run by py-libp2p's own CI (which uses `tests/`). +- The unified-testing framework runs this code when py-libp2p is included in `images.yaml` and the perf/transport test suite is executed (e.g. in the test-plans or unified-testing repo). + +## References + +- [Unified-testing framework](https://github.com/libp2p/unified-testing) – Bash + Docker test runner +- [write-a-perf-test-app.md](https://github.com/libp2p/unified-testing/blob/master/docs/write-a-perf-test-app.md) – Perf test app specification +- [write-a-transport-test-app.md](https://github.com/libp2p/unified-testing/blob/master/docs/write-a-transport-test-app.md) – Transport test app specification +- [libp2p perf protocol spec](https://github.com/libp2p/specs/blob/master/perf/perf.md)