diff --git a/docs/examples.perf.rst b/docs/examples.perf.rst new file mode 100644 index 000000000..00b0532f2 --- /dev/null +++ b/docs/examples.perf.rst @@ -0,0 +1,69 @@ +Perf Protocol Demo +================== + +This example demonstrates how to use the libp2p ``perf`` protocol to measure +transfer performance between two nodes. + +The perf protocol sends and receives data to measure throughput, reporting +both intermediary progress and final results. + +Running the Example +------------------- + +First, start the server in one terminal: + +.. code-block:: console + + $ python examples/perf/perf_example.py -p 8000 + + Perf server ready, listening on: + /ip4/127.0.0.1/tcp/8000/p2p/QmXfptdHU6hqG95JswxYVUH4bphcK8y18mhFcgUQFe6fCN + + Protocol: /perf/1.0.0 + + Run client with: + python perf_example.py -d /ip4/127.0.0.1/tcp/8000/p2p/QmXfptdHU6hqG95JswxYVUH4bphcK8y18mhFcgUQFe6fCN + + Waiting for incoming perf requests... + +Then, in another terminal, run the client with the multiaddr from the server: + +.. code-block:: console + + $ python examples/perf/perf_example.py -d /ip4/127.0.0.1/tcp/8000/p2p/QmXfptdHU6hqG95JswxYVUH4bphcK8y18mhFcgUQFe6fCN + + Connecting to QmXfptdHU6hqG95JswxYVUH4bphcK8y18mhFcgUQFe6fCN... + Connected! + + Measuring performance: + Upload: 2560 bytes + Download: 2560 bytes + + Uploading: 2560 bytes in 0.01s (256000 bytes/s) + Downloading: 2560 bytes in 0.01s (256000 bytes/s) + + ================================================== + Performance Results: + Total time: 0.025 seconds + Uploaded: 2560 bytes + Downloaded: 2560 bytes + Total data: 5120 bytes + Throughput: 204800 bytes/s + ================================================== + +Command Line Options +-------------------- + +.. code-block:: text + + -p, --port Listening port (default: random free port) + -d, --destination Destination multiaddr (if not set, runs as server) + -u, --upload Upload size in units of 256 bytes (default: 10) + -D, --download Download size in units of 256 bytes (default: 10) + +Source Code +----------- + +.. literalinclude:: ../examples/perf/perf_example.py + :language: python + :linenos: diff --git a/docs/examples.rst b/docs/examples.rst index 84831c65d..c489e2251 100644 --- a/docs/examples.rst +++ b/docs/examples.rst @@ -27,3 +27,4 @@ Examples examples.websocket examples.tls examples.autotls + examples.perf diff --git a/docs/libp2p.perf.rst b/docs/libp2p.perf.rst new file mode 100644 index 000000000..2b9d36386 --- /dev/null +++ b/docs/libp2p.perf.rst @@ -0,0 +1,41 @@ +libp2p.perf package +=================== + +The perf module implements the libp2p performance measurement protocol, +which allows measuring throughput between two libp2p nodes by sending +and receiving configurable amounts of data. + +Submodules +---------- + +libp2p.perf.constants module +---------------------------- + +.. automodule:: libp2p.perf.constants + :members: + :undoc-members: + :show-inheritance: + +libp2p.perf.types module +------------------------ + +.. automodule:: libp2p.perf.types + :members: + :undoc-members: + :show-inheritance: + +libp2p.perf.perf\_service module +-------------------------------- + +.. automodule:: libp2p.perf.perf_service + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: libp2p.perf + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/libp2p.rst b/docs/libp2p.rst index 62daa743d..c4a61f6a5 100644 --- a/docs/libp2p.rst +++ b/docs/libp2p.rst @@ -16,6 +16,7 @@ Subpackages libp2p.kad_dht libp2p.network libp2p.peer + libp2p.perf libp2p.protocol_muxer libp2p.pubsub libp2p.rcmgr diff --git a/examples/perf/perf_example.py b/examples/perf/perf_example.py new file mode 100644 index 000000000..25b6ad4fc --- /dev/null +++ b/examples/perf/perf_example.py @@ -0,0 +1,165 @@ +""" +Perf protocol example - Measure transfer performance between two libp2p nodes. + +Usage: + # Terminal 1 - Run the server (listener) + python perf_example.py -p 8000 + + # Terminal 2 - Run the client (measures performance to server) + python perf_example.py -p 8001 -d /ip4/127.0.0.1/tcp/8000/p2p/ +""" + +import argparse + +import multiaddr +import trio + +from libp2p import new_host +from libp2p.peer.peerinfo import info_from_p2p_addr +from libp2p.perf import PROTOCOL_NAME, PerfService + +ONE_UNIT = 16 * 16 # 256 bytes +UPLOAD_BYTES = ONE_UNIT * 10 # 2560 bytes upload +DOWNLOAD_BYTES = ONE_UNIT * 10 # 2560 bytes download + + +async def run_server(host, perf_service) -> None: + """Run as a perf server - listens for incoming perf requests.""" + await perf_service.start() + + print("\nPerf server ready, listening on:") + for addr in host.get_addrs(): + print(f" {addr}") + + print(f"\nProtocol: {PROTOCOL_NAME}") + print("\nRun client with:") + print(f" python perf_example.py -d {host.get_addrs()[0]}") + print("\nWaiting for incoming perf requests...") + + await trio.sleep_forever() + + +async def run_client( + host, perf_service, destination: str, upload_bytes: int, download_bytes: int +) -> None: + """Run as a perf client - measures performance to a remote peer.""" + await perf_service.start() + + maddr = multiaddr.Multiaddr(destination) + info = info_from_p2p_addr(maddr) + + print(f"\nConnecting to {info.peer_id}...") + await host.connect(info) + print("Connected!") + + print("\nMeasuring performance:") + print(f" Upload: {upload_bytes} bytes") + print(f" Download: {download_bytes} bytes") + print() + + async for output in perf_service.measure_performance( + maddr, upload_bytes, download_bytes + ): + if output["type"] == "intermediary": + # Progress report + upload_bytes_out = output["upload_bytes"] + download_bytes_out = output["download_bytes"] + time_s = output["time_seconds"] + + if upload_bytes_out > 0: + throughput = upload_bytes_out / time_s if time_s > 0 else 0 + print( + f" Upload: {upload_bytes_out}B in {time_s:.2f}s " + f"({throughput:.0f} B/s)" + ) + elif download_bytes_out > 0: + throughput = download_bytes_out / time_s if time_s > 0 else 0 + print( + f" Download: {download_bytes_out}B in {time_s:.2f}s " + f"({throughput:.0f} B/s)" + ) + + elif output["type"] == "final": + # Final summary + total_time = output["time_seconds"] + total_upload = output["upload_bytes"] + total_download = output["download_bytes"] + total_data = total_upload + total_download + + print(f"\n{'=' * 50}") + print("Performance Results:") + print(f" Total time: {total_time:.3f} seconds") + print(f" Uploaded: {total_upload} bytes") + print(f" Downloaded: {total_download} bytes") + print(f" Total data: {total_data} bytes") + print(f" Throughput: {total_data / total_time:.0f} bytes/s") + print(f"{'=' * 50}") + + await perf_service.stop() + + +async def run(port: int, destination: str, upload_mb: int, download_mb: int) -> None: + """Main run function.""" + from libp2p.utils.address_validation import find_free_port + + if port <= 0: + port = find_free_port() + + listen_addrs = [multiaddr.Multiaddr(f"/ip4/127.0.0.1/tcp/{port}")] + host = new_host(listen_addrs=listen_addrs) + + # Create perf service + perf_service = PerfService(host) + + async with host.run(listen_addrs=listen_addrs): + if destination: + # Client mode + await run_client( + host, + perf_service, + destination, + upload_mb * ONE_UNIT, + download_mb * ONE_UNIT, + ) + else: + # Server mode + await run_server(host, perf_service) + + +def main() -> None: + description = """ + Perf protocol example - Measure transfer performance between libp2p nodes. + + To use: + 1. Start server: python perf_example.py -p 8000 + 2. Start client: python perf_example.py -d + """ + + parser = argparse.ArgumentParser(description=description) + parser.add_argument("-p", "--port", default=0, type=int, help="listening port") + parser.add_argument("-d", "--destination", type=str, help="destination multiaddr") + parser.add_argument( + "-u", + "--upload", + default=10, + type=int, + help="upload size in units of 256 bytes (default: 10)", + ) + parser.add_argument( + "-D", + "--download", + default=10, + type=int, + help="download size in units of 256 bytes (default: 10)", + ) + + args = parser.parse_args() + + try: + trio.run(run, args.port, args.destination, args.upload, args.download) + except KeyboardInterrupt: + print("\nShutting down...") + + +if __name__ == "__main__": + main() diff --git a/interop/README.md b/interop/README.md new file mode 100644 index 000000000..512a9ee86 --- /dev/null +++ b/interop/README.md @@ -0,0 +1,86 @@ +# Interop Directory + +This directory contains **interoperability and performance test material** for py-libp2p, designed to integrate with the [unified-testing framework](https://github.com/libp2p/unified-testing) (successor to libp2p/test-plans). + +## Purpose + +The `interop/` directory houses the Docker images, test scripts, and configuration needed to run py-libp2p in cross-implementation tests. The unified-testing convention allows this material to live in the implementation repository itself, which: + +- Enables **local testing** against other libp2p implementations (Rust, Go, JS, .NET) without syncing between repos +- Serves as an **example** for setting up Docker-based protocol tests with py-libp2p +- Keeps test implementations versioned and developed alongside the library + +## Directory Layout + +``` +interop/ +├── perf/ # Performance (throughput, latency) tests +│ ├── Dockerfile # Builds image for perf protocol testing +│ ├── perf_test.py # Test application (listener + dialer) +│ └── pyproject.toml # Dependencies (libp2p, redis, etc.) +└── transport/ # Transport interoperability tests + ├── Dockerfile + ├── ping_test.py + └── pyproject.toml +``` + +Each subdirectory corresponds to a **test type** in the unified-testing framework. + +## How It Integrates with Unified-Testing + +The unified-testing framework (see `unified-testing/docs/`) runs tests by: + +1. **Building Docker images** – Uses `images.yaml` to define implementations. For py-libp2p, the build uses `source.type: github` (or `local`) pointing at this repo, with a `dockerfile` path such as `interop/perf/Dockerfile`. +1. **Running tests** – The framework starts **listener** and **dialer** containers on a shared network, coordinates them via **Redis**, and collects results. + +### Perf Tests (`interop/perf/`) + +Perf tests measure: + +- **Upload throughput** – How fast the dialer sends data to the listener +- **Download throughput** – How fast the dialer receives data from the listener +- **Latency** – Round-trip time for small messages + +The test app (`perf_test.py`) implements the [libp2p perf protocol](https://github.com/libp2p/specs/blob/master/perf/perf.md) (`/perf/1.0.0`) and follows [write-a-perf-test-app.md](https://github.com/libp2p/unified-testing/blob/master/docs/write-a-perf-test-app.md): + +- Reads config from environment variables (`IS_DIALER`, `REDIS_ADDR`, `TEST_KEY`, `TRANSPORT`, etc.) +- Listener publishes its multiaddr to Redis; dialer polls and connects +- Dialer runs upload/download/latency iterations and outputs YAML results to stdout +- All logging goes to stderr (stdout is reserved for results) + +### Transport Tests (`interop/transport/`) + +Transport tests verify that py-libp2p can establish connections and exchange protocols with other implementations over various transport, secure channel, and muxer combinations (TCP, QUIC, WebSocket, Noise, TLS, yamux, mplex). + +## Build Context + +When building from the py-libp2p repo: + +- **Build context** = repository root (not `interop/perf/` or `interop/transport/`) +- The Dockerfile uses `COPY . /app/py-libp2p` to include the full libp2p source, then copies the test script and installs dependencies so the test app uses the in-repo libp2p. + +This ensures each Docker image is built against the exact py-libp2p version in the repo or specified commit. + +## Running Locally + +To run interop tests, use the unified-testing framework: + +```bash +# From the unified-testing repo +cd perf +./run.sh --impl-select "python-v0.x" # When python is in images.yaml +``` + +To run the perf test script directly (e.g. for development), see `examples/perf/perf_example.py` and the [perf protocol documentation](../../libp2p/perf/). + +## Relationship to CI + +- Code in `interop/` is **not** run by py-libp2p's own CI (which uses `tests/`). +- The unified-testing framework runs this code when py-libp2p is included in `images.yaml` and the perf/transport test suite is executed (e.g. in the test-plans or unified-testing repo). + +## References + +- [Unified-testing framework](https://github.com/libp2p/unified-testing) – Bash + Docker test runner +- [write-a-perf-test-app.md](https://github.com/libp2p/unified-testing/blob/master/docs/write-a-perf-test-app.md) – Perf test app specification +- [write-a-transport-test-app.md](https://github.com/libp2p/unified-testing/blob/master/docs/write-a-transport-test-app.md) – Transport test app specification +- [libp2p perf protocol spec](https://github.com/libp2p/specs/blob/master/perf/perf.md) diff --git a/interop/perf/Dockerfile b/interop/perf/Dockerfile new file mode 100644 index 000000000..1329537b4 --- /dev/null +++ b/interop/perf/Dockerfile @@ -0,0 +1,36 @@ +FROM python:3.13-slim + +WORKDIR /app + +# Install system dependencies (contributing.rst + transport Dockerfile) +RUN apt-get update && apt-get install -y \ + redis-tools \ + build-essential \ + cmake \ + pkg-config \ + libgmp-dev \ + git \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# Install uv (docs/contributing.rst Option 2: same as CI) +RUN pip install --no-cache-dir uv +RUN uv --version + +# Build context = repo root (same as interop/transport/Dockerfile for GitHub + local) +COPY . /app/py-libp2p + +RUN uv venv /app/venv + +# Interop/perf project: depends on libp2p from copied clone +COPY interop/perf/pyproject.toml . +RUN uv pip install --python /app/venv/bin/python --upgrade pip && \ + uv pip install --python /app/venv/bin/python --no-cache-dir -e . + +COPY interop/perf/perf_test.py . + +ENV PYTHONUNBUFFERED=1 +ENV CI=true +ENV PATH="/app/venv/bin:${PATH}" + +ENTRYPOINT ["/app/venv/bin/python", "perf_test.py"] diff --git a/interop/perf/perf_test.py b/interop/perf/perf_test.py new file mode 100644 index 000000000..ae376b2d5 --- /dev/null +++ b/interop/perf/perf_test.py @@ -0,0 +1,692 @@ +#!/usr/bin/env python3 +""" +Python libp2p perf test implementation for test-plans perf. + +Follows docs/write-a-perf-test-app.md: +- Reads configuration from environment variables +- Connects to Redis for coordination (SET/GET to match Rust) +- Implements both listener and dialer roles +- Measures upload/download throughput and latency +- Outputs results in YAML format to stdout (stderr for logs) +""" + +from datetime import datetime, timedelta, timezone +import logging +import os +import ssl +import sys +import tempfile +import time +from typing import Any + +from cryptography import x509 +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.x509.oid import NameOID +import multiaddr +import redis +import trio + +try: + ExceptionGroup # noqa: B018 +except NameError: + from exceptiongroup import ExceptionGroup # type: ignore[no-redef] + +from libp2p import create_mplex_muxer_option, create_yamux_muxer_option, new_host +from libp2p.crypto.ed25519 import create_new_key_pair +from libp2p.crypto.x25519 import create_new_key_pair as create_new_x25519_key_pair +from libp2p.custom_types import TProtocol +from libp2p.peer.peerinfo import info_from_p2p_addr +from libp2p.perf import PROTOCOL_NAME, PerfService +from libp2p.security.insecure.transport import PLAINTEXT_PROTOCOL_ID, InsecureTransport +from libp2p.security.noise.transport import ( + PROTOCOL_ID as NOISE_PROTOCOL_ID, + Transport as NoiseTransport, +) +from libp2p.security.tls.transport import ( + PROTOCOL_ID as TLS_PROTOCOL_ID, + TLSTransport, +) +from libp2p.utils.address_validation import get_available_interfaces + +MAX_TEST_TIMEOUT = 300 +logger = logging.getLogger("libp2p.perf_test") + + +def configure_logging() -> None: + """Configure logging based on DEBUG environment variable.""" + debug_value = os.getenv("DEBUG") or "false" + debug_enabled = debug_value.upper() in ["DEBUG", "1", "TRUE", "YES"] + + for logger_name in [ + "multiaddr", + "multiaddr.transforms", + "multiaddr.codecs", + "multiaddr.codecs.cid", + ]: + logging.getLogger(logger_name).setLevel(logging.WARNING) + + if debug_enabled: + for name in ["", "libp2p.perf_test", "libp2p", "libp2p.perf"]: + logging.getLogger(name).setLevel(logging.DEBUG) + print("Debug logging enabled", file=sys.stderr) + else: + logging.getLogger().setLevel(logging.INFO) + logging.getLogger("libp2p.perf_test").setLevel(logging.INFO) + for name in ["libp2p", "libp2p.transport"]: + logging.getLogger(name).setLevel(logging.WARNING) + + +def _percentile(sorted_values: list[float], p: float) -> float: + """Linear interpolation percentile.""" + n = len(sorted_values) + if n == 0: + return 0.0 + if n == 1: + return sorted_values[0] + index = (p / 100.0) * (n - 1) + lower = int(index) + upper = min(lower + 1, n - 1) + weight = index - lower + return sorted_values[lower] * (1.0 - weight) + sorted_values[upper] * weight + + +def _is_connection_closed_error(exc: BaseException) -> bool: + """True if this is the expected 'Connection closed' from swarm/mplex on shutdown.""" + msg = str(exc).lower() + if "connection closed" in msg: + return True + if isinstance(exc, ExceptionGroup): + return all(_is_connection_closed_error(e) for e in exc.exceptions) + return False + + +def _compute_stats(samples: list[float], is_latency: bool = False) -> dict[str, Any]: + """Compute min, q1, median, q3, max, outliers, samples (IQR-based).""" + if not samples: + return { + "min": 0.0, + "q1": 0.0, + "median": 0.0, + "q3": 0.0, + "max": 0.0, + "outliers": [], + "samples": [], + } + sorted_vals = sorted(samples) + q1 = _percentile(sorted_vals, 25.0) + median = _percentile(sorted_vals, 50.0) + q3 = _percentile(sorted_vals, 75.0) + iqr = q3 - q1 + lower_fence = q1 - 1.5 * iqr + upper_fence = q3 + 1.5 * iqr + outliers = [v for v in sorted_vals if v < lower_fence or v > upper_fence] + non_outliers = [v for v in sorted_vals if lower_fence <= v <= upper_fence] + if non_outliers: + min_val, max_val = non_outliers[0], non_outliers[-1] + else: + min_val, max_val = sorted_vals[0], sorted_vals[-1] + fmt = "{:.3f}" if is_latency else "{:.2f}" + return { + "min": min_val, + "q1": q1, + "median": median, + "q3": q3, + "max": max_val, + "outliers": [float(fmt.format(x)) for x in outliers], + "samples": [float(fmt.format(x)) for x in sorted_vals], + } + + +class PerfTest: + def __init__(self) -> None: + self.transport = os.getenv("TRANSPORT") + if not self.transport: + raise ValueError("TRANSPORT environment variable is required") + standalone = ["quic-v1"] + self.muxer: str | None = None + self.security: str | None = None + if self.transport not in standalone: + self.muxer = os.getenv("MUXER") or "" + self.security = os.getenv("SECURE_CHANNEL") or "" + if not self.muxer or not self.security: + raise ValueError( + "MUXER and SECURE_CHANNEL required for non-standalone transport" + ) + else: + self.muxer = os.getenv("MUXER") + self.security = os.getenv("SECURE_CHANNEL") + + is_dialer_val = os.getenv("IS_DIALER") + if is_dialer_val is None: + raise ValueError("IS_DIALER environment variable is required") + self.is_dialer = is_dialer_val == "true" + + self.ip = os.getenv("LISTENER_IP") or "0.0.0.0" + self.redis_addr = os.getenv("REDIS_ADDR") + if not self.redis_addr: + raise ValueError("REDIS_ADDR environment variable is required") + if ":" in self.redis_addr: + self.redis_host, port = self.redis_addr.split(":", 1) + self.redis_port = int(port) + else: + self.redis_host = self.redis_addr + self.redis_port = 6379 + + self.test_key = os.getenv("TEST_KEY") + if not self.test_key: + raise ValueError("TEST_KEY environment variable is required") + + self.upload_bytes = int(os.getenv("UPLOAD_BYTES") or "1073741824") + self.download_bytes = int(os.getenv("DOWNLOAD_BYTES") or "1073741824") + self.upload_iterations = int(os.getenv("UPLOAD_ITERATIONS") or "10") + self.download_iterations = int(os.getenv("DOWNLOAD_ITERATIONS") or "10") + self.latency_iterations = int(os.getenv("LATENCY_ITERATIONS") or "100") + + timeout_val = os.getenv("TEST_TIMEOUT_SECS") or "180" + self.test_timeout_seconds = min(int(timeout_val), MAX_TEST_TIMEOUT) + + self.host: Any = None + self.redis_client: redis.Redis[str] | None = None + self.perf_service: PerfService | None = None + + def validate_configuration(self) -> None: + valid_transports = ["tcp", "ws", "wss", "quic-v1"] + valid_security = ["noise", "plaintext", "tls"] + valid_muxers = ["mplex", "yamux"] + standalone = ["quic-v1"] + if self.transport not in valid_transports: + raise ValueError( + f"Unsupported transport: {self.transport}. " + f"Supported: {valid_transports}" + ) + if self.transport not in standalone: + if self.security not in valid_security: + raise ValueError( + f"Unsupported security: {self.security}. " + f"Supported: {valid_security}" + ) + if self.muxer not in valid_muxers: + raise ValueError( + f"Unsupported muxer: {self.muxer}. Supported: {valid_muxers}" + ) + + def create_security_options(self) -> tuple[dict[TProtocol, Any], Any]: + standalone = ["quic-v1"] + if self.transport in standalone: + return {}, create_new_key_pair() + key_pair = create_new_key_pair() + if self.security == "noise": + noise_kp = create_new_x25519_key_pair() + noise_transport = NoiseTransport( + libp2p_keypair=key_pair, + noise_privkey=noise_kp.private_key, + early_data=None, + ) + return {NOISE_PROTOCOL_ID: noise_transport}, key_pair + if self.security == "tls": + tls_transport = TLSTransport( + libp2p_keypair=key_pair, + early_data=None, + muxers=None, + ) + return {TLS_PROTOCOL_ID: tls_transport}, key_pair + if self.security == "plaintext": + pt = InsecureTransport( + local_key_pair=key_pair, + secure_bytes_provider=None, + peerstore=None, + ) + return {PLAINTEXT_PROTOCOL_ID: pt}, key_pair + raise ValueError(f"Unsupported security: {self.security}") + + def create_muxer_options(self) -> Any: + if self.transport in ["quic-v1"]: + return None + if self.muxer == "yamux": + return create_yamux_muxer_option() + if self.muxer == "mplex": + return create_mplex_muxer_option() + raise ValueError(f"Unsupported muxer: {self.muxer}") + + def create_tls_client_config(self) -> ssl.SSLContext | None: + if self.transport == "wss": + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + return ctx + return None + + def create_tls_server_config(self) -> ssl.SSLContext | None: + if self.transport == "wss": + try: + pk = rsa.generate_private_key(public_exponent=65537, key_size=2048) + subject = issuer = x509.Name( + [ + x509.NameAttribute(NameOID.COMMON_NAME, "libp2p.local"), + ] + ) + cert = ( + x509.CertificateBuilder() + .subject_name(subject) + .issuer_name(issuer) + .public_key(pk.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.now(timezone.utc)) + .not_valid_after(datetime.now(timezone.utc) + timedelta(days=365)) + .sign(pk, hashes.SHA256()) + ) + ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + with ( + tempfile.NamedTemporaryFile(mode="wb", delete=False) as cf, + tempfile.NamedTemporaryFile(mode="wb", delete=False) as kf, + ): + cf.write(cert.public_bytes(serialization.Encoding.PEM)) + kf.write( + pk.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ) + ) + ctx.load_cert_chain(cf.name, kf.name) + try: + os.unlink(cf.name) + os.unlink(kf.name) + except Exception: + pass + return ctx + except Exception as e: + print(f"WARNING: TLS server config failed: {e}", file=sys.stderr) + return None + return None + + def _get_ip_value(self, addr: multiaddr.Multiaddr) -> str | None: + return addr.value_for_protocol("ip4") or addr.value_for_protocol("ip6") + + def _get_protocol_names(self, addr: multiaddr.Multiaddr) -> list[str]: + return [p.name for p in addr.protocols()] + + def _extract_and_preserve_p2p( + self, addr: multiaddr.Multiaddr + ) -> tuple[multiaddr.Multiaddr, str | None]: + p2p_value = None + if "p2p" in self._get_protocol_names(addr): + p2p_value = addr.value_for_protocol("p2p") + if p2p_value: + addr = addr.decapsulate(multiaddr.Multiaddr(f"/p2p/{p2p_value}")) + return addr, p2p_value + + def _encapsulate_with_p2p( + self, addr: multiaddr.Multiaddr, p2p_value: str | None + ) -> multiaddr.Multiaddr: + if p2p_value: + return addr.encapsulate(multiaddr.Multiaddr(f"/p2p/{p2p_value}")) + return addr + + def _build_quic_addr(self, ip_value: str, port: int) -> multiaddr.Multiaddr: + if ":" in ip_value: + base = multiaddr.Multiaddr(f"/ip6/{ip_value}/udp/{port}") + else: + base = multiaddr.Multiaddr(f"/ip4/{ip_value}/udp/{port}") + return base.encapsulate(multiaddr.Multiaddr("/quic-v1")) + + def create_listen_addresses(self, port: int = 0) -> list[multiaddr.Multiaddr]: + base_addrs = get_available_interfaces(port, protocol="tcp") + if self.transport == "quic-v1": + out = [] + for addr in base_addrs: + ip_value = self._get_ip_value(addr) + tcp_port = addr.value_for_protocol("tcp") or port + if ip_value: + qa = self._build_quic_addr(ip_value, tcp_port) + _, p2p = self._extract_and_preserve_p2p(addr) + qa = self._encapsulate_with_p2p(qa, p2p) + out.append(qa) + return out if out else [self._build_quic_addr("0.0.0.0", port)] + if self.transport == "ws": + out = [] + for addr in base_addrs: + try: + names = self._get_protocol_names(addr) + if "ws" in names or "wss" in names: + out.append(addr) + else: + a, p2p = self._extract_and_preserve_p2p(addr) + wa = a.encapsulate(multiaddr.Multiaddr("/ws")) + out.append(self._encapsulate_with_p2p(wa, p2p)) + except Exception: + pass + return out if out else [multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}/ws")] + if self.transport == "wss": + out = [] + for addr in base_addrs: + try: + names = self._get_protocol_names(addr) + a, p2p = self._extract_and_preserve_p2p(addr) + if "wss" in names: + out.append(addr) + elif "ws" in names: + a_ = a.decapsulate(multiaddr.Multiaddr("/ws")) + wss = a_.encapsulate(multiaddr.Multiaddr("/wss")) + out.append(self._encapsulate_with_p2p(wss, p2p)) + else: + wss = a.encapsulate(multiaddr.Multiaddr("/wss")) + out.append(self._encapsulate_with_p2p(wss, p2p)) + except Exception: + pass + return out if out else [multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}/wss")] + return base_addrs + + def _filter_addresses_by_transport( + self, addresses: list[multiaddr.Multiaddr] + ) -> list[multiaddr.Multiaddr]: + out = [] + for addr in addresses: + names = self._get_protocol_names(addr) + if self.transport == "ws" and ("ws" in names or "wss" in names): + out.append(addr) + elif self.transport == "wss" and "wss" in names: + out.append(addr) + elif self.transport == "quic-v1" and "quic-v1" in names: + out.append(addr) + elif self.transport == "tcp" and not any( + p in names for p in ["ws", "wss", "quic-v1"] + ): + out.append(addr) + return out if out else addresses + + def get_container_ip(self) -> str: + import socket + import subprocess + + try: + r = subprocess.run( + ["hostname", "-I"], capture_output=True, text=True, timeout=5 + ) + if r.returncode == 0 and r.stdout.strip(): + return r.stdout.strip().split()[0] + except Exception: + pass + try: + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: + s.connect(("8.8.8.8", 80)) + return s.getsockname()[0] + except Exception: + return "172.17.0.1" + + def _replace_loopback_ip(self, addr: multiaddr.Multiaddr) -> str: + ip_value = self._get_ip_value(addr) + if ip_value not in ["127.0.0.1", "0.0.0.0", "::1", "::"]: + return str(addr) + actual = self.get_container_ip() + names = self._get_protocol_names(addr) + is_ipv6 = "ip6" in names + parts = [f"/ip6/{actual}" if is_ipv6 else f"/ip4/{actual}"] + found = False + for proto, value in addr.items(): + if proto.name in ["ip4", "ip6"]: + found = True + continue + if found: + parts.append(f"/{proto.name}/{value}" if value else f"/{proto.name}") + return str(multiaddr.Multiaddr("".join(parts))) + + def _get_publishable_address(self, addresses: list[multiaddr.Multiaddr]) -> str: + filtered = self._filter_addresses_by_transport(addresses) + if not filtered: + filtered = addresses + for addr in filtered: + ip_value = self._get_ip_value(addr) + if ip_value and ip_value not in ["127.0.0.1", "0.0.0.0", "::1", "::"]: + return str(addr) + return self._replace_loopback_ip(filtered[0]) + + async def _connect_redis_with_retry( + self, max_retries: int = 10, retry_delay: float = 1.0 + ) -> None: + print("Connecting to Redis...", file=sys.stderr) + for attempt in range(max_retries): + try: + self.redis_client = redis.Redis( + host=self.redis_host, + port=self.redis_port, + decode_responses=True, + ) + self.redis_client.ping() + print(f"Connected to Redis on attempt {attempt + 1}", file=sys.stderr) + return + except Exception as e: + print(f"Redis attempt {attempt + 1} failed: {e}", file=sys.stderr) + if attempt < max_retries - 1: + await trio.sleep(retry_delay) + raise RuntimeError(f"Failed to connect to Redis after {max_retries} attempts") + + async def run_listener(self) -> None: + self.validate_configuration() + await self._connect_redis_with_retry() + + sec_opt, key_pair = self.create_security_options() + muxer_opt = self.create_muxer_options() + listen_addrs = self.create_listen_addresses(0) + tls_client = self.create_tls_client_config() + tls_server = self.create_tls_server_config() + + self.host = new_host( + key_pair=key_pair, + sec_opt=sec_opt, + muxer_opt=muxer_opt, + listen_addrs=listen_addrs, + enable_quic=(self.transport == "quic-v1"), + tls_client_config=tls_client, + tls_server_config=tls_server, + ) + self.perf_service = PerfService(self.host) + await self.perf_service.start() + print(f"Perf service started (protocol {PROTOCOL_NAME})", file=sys.stderr) + + async with self.host.run(listen_addrs=listen_addrs): + all_addrs = self.host.get_addrs() + if not all_addrs: + raise RuntimeError("No listen addresses available") + actual_addr = self._get_publishable_address(all_addrs) + print(f"Publishing address: {actual_addr}", file=sys.stderr) + redis_key = f"{self.test_key}_listener_multiaddr" + assert self.redis_client is not None + self.redis_client.set(redis_key, actual_addr) + print("Listener ready, waiting for dialer...", file=sys.stderr) + await trio.sleep_forever() + + async def _wait_for_listener_addr(self) -> str: + redis_key = f"{self.test_key}_listener_multiaddr" + timeout = min(self.test_timeout_seconds, MAX_TEST_TIMEOUT) + deadline = time.monotonic() + timeout + assert self.redis_client is not None + while time.monotonic() < deadline: + addr = self.redis_client.get(redis_key) + if addr: + return addr + await trio.sleep(0.5) + raise RuntimeError( + f"Timeout waiting for listener address (key {redis_key}) after {timeout}s" + ) + + async def _one_measurement( + self, + send_bytes: int, + recv_bytes: int, + ) -> float: + """Run one measure_performance call and return elapsed time in seconds.""" + assert self.host is not None + assert self.perf_service is not None + maddr = multiaddr.Multiaddr(self.listener_addr) + start = time.monotonic() + async for _ in self.perf_service.measure_performance( + maddr, send_bytes, recv_bytes + ): + pass + return time.monotonic() - start + + async def run_dialer(self) -> None: + self.validate_configuration() + await self._connect_redis_with_retry() + print("Waiting for listener address...", file=sys.stderr) + self.listener_addr = await self._wait_for_listener_addr() + print(f"Got listener address: {self.listener_addr}", file=sys.stderr) + + sec_opt, key_pair = self.create_security_options() + muxer_opt = self.create_muxer_options() + # Dialer needs listen_addrs for ws/wss so transport is registered; + # for quic/tcp pass [] (host.run still starts swarm/nursery) + dialer_listen_addrs = ( + self.create_listen_addresses(0) if self.transport in ["ws", "wss"] else None + ) + tls_client = self.create_tls_client_config() + tls_server = None + + kw: dict[str, Any] = { + "key_pair": key_pair, + "sec_opt": sec_opt, + "muxer_opt": muxer_opt, + "enable_quic": (self.transport == "quic-v1"), + "tls_client_config": tls_client, + "tls_server_config": tls_server, + } + if dialer_listen_addrs: + kw["listen_addrs"] = dialer_listen_addrs + self.host = new_host(**kw) + self.perf_service = PerfService(self.host) + await self.perf_service.start() + + # Must run host inside host.run() so swarm/nursery are active + # (required for connect and QUIC) + try: + async with self.host.run(listen_addrs=dialer_listen_addrs or []): + # Brief delay so listener is fully listening before we dial + await trio.sleep(1.0) + + maddr = multiaddr.Multiaddr(self.listener_addr) + info = info_from_p2p_addr(maddr) + listener_peer_id = info.peer_id + await self.host.connect(info) + print("Connected to listener", file=sys.stderr) + + upload_samples: list[float] = [] + for i in range(self.upload_iterations): + elapsed = await self._one_measurement(self.upload_bytes, 0) + gbps = ( + (self.upload_bytes * 8.0) / elapsed / 1e9 + if elapsed > 0 + else 0.0 + ) + upload_samples.append(gbps) + print( + f"Upload {i + 1}/{self.upload_iterations}: {gbps:.2f} Gbps", + file=sys.stderr, + ) + + download_samples: list[float] = [] + for i in range(self.download_iterations): + elapsed = await self._one_measurement(0, self.download_bytes) + gbps = ( + (self.download_bytes * 8.0) / elapsed / 1e9 + if elapsed > 0 + else 0.0 + ) + download_samples.append(gbps) + print( + f"Download {i + 1}/{self.download_iterations}: {gbps:.2f} Gbps", + file=sys.stderr, + ) + + latency_samples: list[float] = [] + for i in range(self.latency_iterations): + elapsed = await self._one_measurement(1, 1) + latency_samples.append(elapsed * 1000.0) + print("Latency iterations done", file=sys.stderr) + + u = _compute_stats(upload_samples, is_latency=False) + d = _compute_stats(download_samples, is_latency=False) + lat = _compute_stats(latency_samples, is_latency=True) + + # YAML to stdout only (per write-a-perf-test-app.md) + print("upload:") + print(f" iterations: {self.upload_iterations}") + print(f" min: {u['min']:.2f}") + print(f" q1: {u['q1']:.2f}") + print(f" median: {u['median']:.2f}") + print(f" q3: {u['q3']:.2f}") + print(f" max: {u['max']:.2f}") + print(f" outliers: {u['outliers']}") + print(f" samples: {u['samples']}") + print(" unit: Gbps") + print("download:") + print(f" iterations: {self.download_iterations}") + print(f" min: {d['min']:.2f}") + print(f" q1: {d['q1']:.2f}") + print(f" median: {d['median']:.2f}") + print(f" q3: {d['q3']:.2f}") + print(f" max: {d['max']:.2f}") + print(f" outliers: {d['outliers']}") + print(f" samples: {d['samples']}") + print(" unit: Gbps") + print("latency:") + print(f" iterations: {self.latency_iterations}") + print(f" min: {lat['min']:.3f}") + print(f" q1: {lat['q1']:.3f}") + print(f" median: {lat['median']:.3f}") + print(f" q3: {lat['q3']:.3f}") + print(f" max: {lat['max']:.3f}") + print(f" outliers: {lat['outliers']}") + print(f" samples: {lat['samples']}") + print(" unit: ms") + + # Graceful close: disconnect listener so it sees a clean + # close, then stop services + try: + await self.host.disconnect(listener_peer_id) + await trio.sleep(0.5) + except Exception as e: + logger.debug("Disconnect: %s", e) + try: + await self.perf_service.stop() + except Exception as e: + logger.debug("PerfService.stop: %s", e) + if self.redis_client: + try: + self.redis_client.close() + except Exception: + pass + except BaseException as e: + # Swarm/mplex may raise "Connection closed" on disconnect; + # treat as success + if not _is_connection_closed_error(e): + raise + + async def run(self) -> None: + try: + await self._connect_redis_with_retry() + if self.is_dialer: + await self.run_dialer() + else: + await self.run_listener() + except Exception as e: + print(f"Error: {e}", file=sys.stderr) + import traceback + + traceback.print_exc(file=sys.stderr) + if self.redis_client: + self.redis_client.close() + sys.exit(1) + + +async def main() -> None: + configure_logging() + test = PerfTest() + await test.run() + + +if __name__ == "__main__": + trio.run(main) diff --git a/interop/perf/pyproject.toml b/interop/perf/pyproject.toml new file mode 100644 index 000000000..899922697 --- /dev/null +++ b/interop/perf/pyproject.toml @@ -0,0 +1,22 @@ +[build-system] +requires = ["setuptools>=61.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "py-libp2p-perf-test" +version = "0.1.0" +description = "Python libp2p perf test implementation for test-plans perf" +authors = [ + {name = "libp2p", email = "team@libp2p.io"} +] +requires-python = ">=3.11" +dependencies = [ + "libp2p @ file:///app/py-libp2p", + "redis>=4.0.0", + "typing-extensions>=4.0.0", + "cryptography>=41.0.0", +] + +[tool.setuptools.packages.find] +where = ["."] +include = ["perf_test*"] diff --git a/libp2p/abc.py b/libp2p/abc.py index d0a53adf2..399cc7270 100644 --- a/libp2p/abc.py +++ b/libp2p/abc.py @@ -4,6 +4,7 @@ ) from collections.abc import ( AsyncIterable, + AsyncIterator, Iterable, KeysView, Sequence, @@ -3286,3 +3287,56 @@ async def publish(self, topic_id: str | list[str], data: bytes) -> None: """ ... + + +# -------------------------- perf interface.py -------------------------- + + +class IPerf(ABC): + """ + Interface for the perf protocol service. + + Spec: https://github.com/libp2p/specs/blob/master/perf/perf.md + """ + + @abstractmethod + async def start(self) -> None: + """Start the perf service and register the protocol handler.""" + ... + + @abstractmethod + async def stop(self) -> None: + """Stop the perf service and unregister the protocol handler.""" + ... + + @abstractmethod + def is_started(self) -> bool: + """Check if the service is currently running.""" + ... + + @abstractmethod + def measure_performance( + self, + multiaddr: Multiaddr, + send_bytes: int, + recv_bytes: int, + ) -> AsyncIterator[Any]: + """ + Measure transfer performance to a remote peer. + + Parameters + ---------- + multiaddr : Multiaddr + The address of the remote peer to test against. + send_bytes : int + Number of bytes to upload to the remote peer. + recv_bytes : int + Number of bytes to request the remote peer to send back. + + Yields + ------ + PerfOutput + Progress reports during the transfer, with a final summary at the end. + + """ + ... diff --git a/libp2p/perf/__init__.py b/libp2p/perf/__init__.py new file mode 100644 index 000000000..2d0ffde5e --- /dev/null +++ b/libp2p/perf/__init__.py @@ -0,0 +1,32 @@ +""" +Perf protocol for measuring transfer performance. + +This module implements the libp2p perf protocol as specified in: +https://github.com/libp2p/specs/blob/master/perf/perf.md +""" + +from .constants import ( + PROTOCOL_NAME, + WRITE_BLOCK_SIZE, +) +from .types import ( + PerfComponents, + PerfInit, + PerfOutput, +) +from libp2p.abc import IPerf +from .perf_service import PerfService + +__all__ = [ + # Constants + "PROTOCOL_NAME", + "WRITE_BLOCK_SIZE", + # Types + "PerfOutput", + "PerfInit", + "PerfComponents", + # Interface + "IPerf", + # Implementation + "PerfService", +] diff --git a/libp2p/perf/constants.py b/libp2p/perf/constants.py new file mode 100644 index 000000000..a801967de --- /dev/null +++ b/libp2p/perf/constants.py @@ -0,0 +1,5 @@ +# Protocol constants for the perf protocol +# https://github.com/libp2p/specs/blob/master/perf/perf.md + +PROTOCOL_NAME = "/perf/1.0.0" +WRITE_BLOCK_SIZE = 65536 diff --git a/libp2p/perf/perf_service.py b/libp2p/perf/perf_service.py new file mode 100644 index 000000000..55798572b --- /dev/null +++ b/libp2p/perf/perf_service.py @@ -0,0 +1,284 @@ +""" +Perf protocol service implementation. +Spec: https://github.com/libp2p/specs/blob/master/perf/perf.md + +Note: + This service is designed for benchmarking and performance testing. + The server accepts any uint64 value for bytes to send back, which + is intentional to support high-volume stress testing. This service + should only be enabled in trusted environments or test networks. + +""" + +from collections.abc import AsyncIterator +import logging +import struct +import time +from typing import cast + +from multiaddr import Multiaddr + +from libp2p.abc import IHost, INetStream, IPerf +from libp2p.custom_types import TProtocol +from libp2p.peer.id import ID as PeerID +from libp2p.peer.peerinfo import PeerInfo + +from .constants import ( + PROTOCOL_NAME, + WRITE_BLOCK_SIZE, +) +from .types import PerfInit, PerfOutput + +logger = logging.getLogger(__name__) + + +class PerfService(IPerf): + """ + Implementation of the perf protocol. + + The perf protocol is used to measure transfer performance within and across + libp2p implementations. + """ + + def __init__(self, host: IHost, init: PerfInit | None = None) -> None: + """ + Initialize the PerfService. + + Parameters + ---------- + host : IHost + The libp2p host instance. + init : PerfInit, optional + Initialization options for the service. + + """ + init_opts: PerfInit = init if init is not None else {} + + self._host = host + self._started = False + self._protocol = TProtocol(init_opts.get("protocol_name", PROTOCOL_NAME)) + self._write_block_size = init_opts.get("write_block_size", WRITE_BLOCK_SIZE) + + # Pre-allocate buffer for sending data + self._buf = bytes(self._write_block_size) + + async def start(self) -> None: + """Start the perf service and register the protocol handler.""" + self._host.set_stream_handler(self._protocol, self._handle_message) + self._started = True + logger.debug("Perf service started with protocol %s", self._protocol) + + async def stop(self) -> None: + """Stop the perf service and unregister the protocol handler.""" + # Note: py-libp2p may not have unregister, but we set the flag + self._started = False + logger.debug("Perf service stopped") + + def is_started(self) -> bool: + """Check if the service is currently running.""" + return self._started + + async def _handle_message(self, stream: INetStream) -> None: + """ + Handle incoming perf protocol messages (server side). + + Reads data from the stream, extracts the number of bytes to send back + from the first 8 bytes, then sends that many bytes back. + """ + if not self._started: + logger.debug("Perf service received stream while stopped; resetting") + try: + await stream.reset() + except Exception: + logger.debug("Failed to reset stopped perf stream", exc_info=True) + return + + try: + # Read exactly 8 bytes for the header (handle TCP fragmentation) + header: bytes = b"" + while len(header) < 8: + try: + chunk = cast(bytes, await stream.read(8 - len(header))) # type: ignore[redundant-cast] + if not chunk: + logger.error("Stream closed before header was fully received") + await stream.reset() + return + header += chunk + except Exception: + logger.error("Error reading header") + await stream.reset() + return + + # Parse the big-endian unsigned 64-bit integer + bytes_to_send_back = struct.unpack(">Q", header)[0] + logger.debug("Received request to send back %d bytes", bytes_to_send_back) + + # Read remaining data until EOF (client closes write side) + while True: + try: + data = await stream.read(self._write_block_size) + if not data: + break + except Exception: + break + + # Send back the requested number of bytes + while bytes_to_send_back > 0: + to_send = min(self._write_block_size, bytes_to_send_back) + await stream.write(self._buf[:to_send]) + bytes_to_send_back -= to_send + + await stream.close() + + except Exception as e: + logger.error("Error handling perf message: %s", e) + await stream.reset() + + async def measure_performance( + self, + multiaddr: Multiaddr, + send_bytes: int, + recv_bytes: int, + ) -> AsyncIterator[PerfOutput]: + """ + Measure transfer performance to a remote peer. + + Parameters + ---------- + multiaddr : Multiaddr + The address of the remote peer to test against. + send_bytes : int + Number of bytes to upload to the remote peer. + recv_bytes : int + Number of bytes to request the remote peer to send back. + + Yields + ------ + PerfOutput + Progress reports during the transfer, with a final summary at the end. + + """ + initial_start_time = time.time() + last_reported_time = time.time() + + # Extract peer ID from multiaddr and connect + peer_id = PeerID.from_base58(str(multiaddr).split("/p2p/")[-1]) + peer_info = PeerInfo(peer_id, [multiaddr]) + + # Connect to the peer + await self._host.connect(peer_info) + logger.debug( + "Opened connection after %.3f ms", + (time.time() - last_reported_time) * 1000, + ) + last_reported_time = time.time() + + # Open a new stream + stream = await self._host.new_stream(peer_id, [self._protocol]) + logger.debug( + "Opened stream after %.3f ms", + (time.time() - last_reported_time) * 1000, + ) + last_reported_time = time.time() + + last_amount_of_bytes_sent = 0 + total_bytes_sent = 0 + upload_start = time.time() + + try: + # Send the number of bytes we want to receive (as big-endian u64) + header = struct.pack(">Q", recv_bytes) + await stream.write(header) + + logger.debug("Sending %d bytes to %s", send_bytes, peer_id) + + # Upload phase + bytes_remaining = send_bytes + while bytes_remaining > 0: + to_send = min(self._write_block_size, bytes_remaining) + await stream.write(self._buf[:to_send]) + bytes_remaining -= to_send + last_amount_of_bytes_sent += to_send + total_bytes_sent += to_send + + # Yield intermediary progress every second + if time.time() - last_reported_time > 1.0: + yield PerfOutput( + type="intermediary", + time_seconds=time.time() - last_reported_time, + upload_bytes=last_amount_of_bytes_sent, + download_bytes=0, + ) + last_reported_time = time.time() + last_amount_of_bytes_sent = 0 + + logger.debug( + "Upload complete after %.3f ms", (time.time() - upload_start) * 1000 + ) + + # Close the write side to signal we're done sending + # Note: close_write() is available on NetStream + # but not on INetStream interface + if hasattr(stream, "close_write"): + await stream.close_write() # type: ignore[attr-defined] + + # Download phase + last_amount_of_bytes_received = 0 + last_reported_time = time.time() + total_bytes_received = 0 + download_start = time.time() + + while total_bytes_received < recv_bytes: + try: + data = await stream.read(self._write_block_size) + if not data: + break + + last_amount_of_bytes_received += len(data) + total_bytes_received += len(data) + + # Yield intermediary progress every second + if time.time() - last_reported_time > 1.0: + yield PerfOutput( + type="intermediary", + time_seconds=time.time() - last_reported_time, + upload_bytes=0, + download_bytes=last_amount_of_bytes_received, + ) + last_reported_time = time.time() + last_amount_of_bytes_received = 0 + except Exception: + break + + logger.debug( + "Download complete after %.3f ms", (time.time() - download_start) * 1000 + ) + + if total_bytes_received != recv_bytes: + raise ValueError( + f"Expected to receive {recv_bytes} bytes, " + f"but received {total_bytes_received}" + ) + + # Yield final result + yield PerfOutput( + type="final", + time_seconds=time.time() - initial_start_time, + upload_bytes=total_bytes_sent, + download_bytes=total_bytes_received, + ) + + logger.debug("Performed %s to %s", self._protocol, peer_id) + + except Exception as e: + logger.error( + "Error sending %d/%d bytes to %s: %s", + total_bytes_sent, + send_bytes, + peer_id, + e, + ) + await stream.reset() + raise + finally: + await stream.close() diff --git a/libp2p/perf/types.py b/libp2p/perf/types.py new file mode 100644 index 000000000..11e117daf --- /dev/null +++ b/libp2p/perf/types.py @@ -0,0 +1,36 @@ +""" +Perf protocol types. + +Spec: https://github.com/libp2p/specs/blob/master/perf/perf.md +""" + +from typing import ( + TYPE_CHECKING, + Literal, + TypedDict, +) + +if TYPE_CHECKING: + from libp2p.abc import IHost + + +class PerfOutput(TypedDict): + """Output data from a performance measurement.""" + + type: Literal["connection", "stream", "intermediary", "final"] + time_seconds: float + upload_bytes: int + download_bytes: int + + +class PerfInit(TypedDict, total=False): + """Initialization options for the perf service.""" + + protocol_name: str + write_block_size: int # Default: 65536 (64KB) + + +class PerfComponents(TypedDict): + """Components required by the perf service.""" + + host: "IHost" diff --git a/newsfragments/1169.feature.rst b/newsfragments/1169.feature.rst new file mode 100644 index 000000000..6b935ccc8 --- /dev/null +++ b/newsfragments/1169.feature.rst @@ -0,0 +1,9 @@ +Added implementation of the libp2p perf protocol for measuring transfer performance. + +The perf protocol allows benchmarking data transfer speeds between libp2p nodes. It includes: +- ``PerfService`` class for perf protocol server/client operations +- ``measure_performance()`` method to measure upload/download throughput to a remote peer +- Server-side handling for responding to perf protocol requests +- Detailed metrics output including latency, upload/download times, and throughput in bytes/second + +See the spec at https://github.com/libp2p/specs/blob/master/perf/perf.md diff --git a/tests/core/perf/__init__.py b/tests/core/perf/__init__.py new file mode 100644 index 000000000..ecd9f41d6 --- /dev/null +++ b/tests/core/perf/__init__.py @@ -0,0 +1 @@ +# Empty init file for perf tests package. diff --git a/tests/core/perf/conftest.py b/tests/core/perf/conftest.py new file mode 100644 index 000000000..631c11f7b --- /dev/null +++ b/tests/core/perf/conftest.py @@ -0,0 +1,73 @@ +"""Shared fixtures and mocks for perf protocol tests.""" + +import pytest + +from libp2p.perf import PerfService + + +class MockNetStream: + """Mock stream for testing server handler in isolation.""" + + def __init__( + self, + data_to_read: bytes, + *, + raise_on_read: bool = False, + close_after_bytes: int | None = None, + raise_after_bytes: int | None = None, + ): + self._read_buffer = data_to_read + self._write_buffer = b"" + self._reset_called = False + self._closed = False + self._raise_on_read = raise_on_read + self._close_after_bytes = close_after_bytes + self._raise_after_bytes = raise_after_bytes + self._bytes_read = 0 + + async def read(self, n: int) -> bytes: + if self._raise_on_read: + raise ConnectionError("Mock read error") + + if self._raise_after_bytes is not None: + if self._bytes_read >= self._raise_after_bytes: + raise ConnectionError("Mock read error after bytes") + + if self._close_after_bytes is not None: + if self._bytes_read >= self._close_after_bytes: + return b"" + + chunk = self._read_buffer[:n] + self._read_buffer = self._read_buffer[n:] + self._bytes_read += len(chunk) + return chunk + + async def write(self, data: bytes) -> None: + self._write_buffer += data + + async def reset(self) -> None: + self._reset_called = True + + async def close(self) -> None: + self._closed = True + + +class MockHost: + """Minimal mock host for PerfService initialization.""" + + def set_stream_handler(self, protocol, handler): + pass + + +@pytest.fixture +def mock_host(): + """Provide a mock host for testing.""" + return MockHost() + + +@pytest.fixture +async def perf_service(mock_host): + """Provide a PerfService instance for testing.""" + service = PerfService(mock_host) + await service.start() + return service diff --git a/tests/core/perf/test_perf_client.py b/tests/core/perf/test_perf_client.py new file mode 100644 index 000000000..82ec80496 --- /dev/null +++ b/tests/core/perf/test_perf_client.py @@ -0,0 +1,148 @@ +"""Tests for perf protocol client-side functionality.""" + +import struct + +import pytest + +from libp2p.perf import PerfService +from tests.utils.factories import host_pair_factory + + +@pytest.mark.trio +async def test_client_upload_small_size(security_protocol): + """Test client upload with small sizes: 0, 1, 100, 1024 bytes.""" + async with host_pair_factory(security_protocol=security_protocol) as ( + host_a, + host_b, + ): + server = PerfService(host_a) + client = PerfService(host_b) + await server.start() + + # Test various small upload sizes + test_sizes = [0, 1, 100, 1024] + + for send_bytes in test_sizes: + recv_bytes = 0 # No download for this test + results = [] + async for output in client.measure_performance( + host_a.get_addrs()[0].encapsulate(f"/p2p/{host_a.get_id()}"), + send_bytes, + recv_bytes, + ): + results.append(output) + + final = results[-1] + assert final["type"] == "final" + assert final["upload_bytes"] == send_bytes, ( + f"Expected upload_bytes={send_bytes}, got {final['upload_bytes']}" + ) + + +@pytest.mark.trio +async def test_client_download_small_size(security_protocol): + """Test client download with small sizes: 0, 1, 100, 1024 bytes.""" + async with host_pair_factory(security_protocol=security_protocol) as ( + host_a, + host_b, + ): + server = PerfService(host_a) + client = PerfService(host_b) + await server.start() + + # Test various small download sizes + test_sizes = [0, 1, 100, 1024] + + for recv_bytes in test_sizes: + send_bytes = 8 # Minimal upload (just header size worth) + results = [] + async for output in client.measure_performance( + host_a.get_addrs()[0].encapsulate(f"/p2p/{host_a.get_id()}"), + send_bytes, + recv_bytes, + ): + results.append(output) + + final = results[-1] + assert final["type"] == "final" + assert final["download_bytes"] == recv_bytes, ( + f"Expected download_bytes={recv_bytes}, got {final['download_bytes']}" + ) + + +@pytest.mark.trio +async def test_client_asymmetric_upload_download(security_protocol): + """Test asymmetric transfer: upload 100 bytes, download 1000 bytes.""" + async with host_pair_factory(security_protocol=security_protocol) as ( + host_a, + host_b, + ): + server = PerfService(host_a) + client = PerfService(host_b) + await server.start() + + send_bytes = 100 + recv_bytes = 1000 + + results = [] + async for output in client.measure_performance( + host_a.get_addrs()[0].encapsulate(f"/p2p/{host_a.get_id()}"), + send_bytes, + recv_bytes, + ): + results.append(output) + + final = results[-1] + assert final["type"] == "final" + assert final["upload_bytes"] == send_bytes + assert final["download_bytes"] == recv_bytes + + +@pytest.mark.trio +async def test_client_final_output_correct(security_protocol): + """Verify final PerfOutput has correct structure and byte counts.""" + async with host_pair_factory(security_protocol=security_protocol) as ( + host_a, + host_b, + ): + server = PerfService(host_a) + client = PerfService(host_b) + await server.start() + + send_bytes = 512 + recv_bytes = 256 + + results = [] + async for output in client.measure_performance( + host_a.get_addrs()[0].encapsulate(f"/p2p/{host_a.get_id()}"), + send_bytes, + recv_bytes, + ): + results.append(output) + + # Verify we got at least a final result + assert len(results) >= 1 + + final = results[-1] + assert final["type"] == "final" + assert final["upload_bytes"] == send_bytes + assert final["download_bytes"] == recv_bytes + assert final["time_seconds"] > 0 + + +def test_client_header_format(): + """Verify the 8-byte header format is correct (unit test).""" + # Test that the header format used by the client is correct + # The client uses struct.pack(">Q", recv_bytes) to create the header + + test_values = [0, 1, 100, 1024, 65536, 1024 * 1024] + + for recv_bytes in test_values: + header = struct.pack(">Q", recv_bytes) + + # Verify header is exactly 8 bytes + assert len(header) == 8 + + # Verify it can be correctly parsed back + parsed = struct.unpack(">Q", header)[0] + assert parsed == recv_bytes diff --git a/tests/core/perf/test_perf_errors.py b/tests/core/perf/test_perf_errors.py new file mode 100644 index 000000000..a19cac330 --- /dev/null +++ b/tests/core/perf/test_perf_errors.py @@ -0,0 +1,157 @@ +"""Tests for perf protocol error handling paths.""" + +import struct + +import pytest + +from libp2p.custom_types import TProtocol +from libp2p.perf import PerfService +from tests.utils.factories import host_pair_factory + +from .conftest import MockNetStream + +# ============================================================================= +# Server Error Path Tests +# ============================================================================= + + +@pytest.mark.trio +async def test_server_stream_reset_before_header(perf_service): + """Server handles stream reset/close before receiving any header bytes.""" + # Stream closes immediately with no data + stream = MockNetStream(b"", close_after_bytes=0) + await perf_service._handle_message(stream) + + # Server should reset the stream on error + assert stream._reset_called is True + + +@pytest.mark.trio +async def test_server_incomplete_header(perf_service): + """Server handles incomplete header (< 8 bytes received before stream closes).""" + # Only 4 bytes of header, then stream closes + incomplete_header = b"\x00\x00\x00\x01" # Only 4 bytes + stream = MockNetStream(incomplete_header) + await perf_service._handle_message(stream) + + # Server should reset the stream when header is incomplete + assert stream._reset_called is True + + +@pytest.mark.trio +async def test_server_error_during_read(perf_service): + """Server properly resets stream on read errors.""" + # Stream will raise an error on first read + stream = MockNetStream(b"", raise_on_read=True) + await perf_service._handle_message(stream) + + # Server should reset the stream on read error + assert stream._reset_called is True + + +@pytest.mark.trio +async def test_server_read_error_after_partial_header(perf_service): + """Server resets stream when read error occurs after partial header.""" + # Provide 4 bytes, then error on next read + partial_header = b"\x00\x00\x00\x01" + stream = MockNetStream(partial_header, raise_after_bytes=4) + await perf_service._handle_message(stream) + + # Server should reset the stream + assert stream._reset_called is True + + +@pytest.mark.trio +async def test_server_resets_stream_when_service_stopped(perf_service): + """Stopped service should reject inbound perf streams.""" + await perf_service.stop() + stream = MockNetStream(struct.pack(">Q", 64) + b"x" * 32) + await perf_service._handle_message(stream) + + assert stream._reset_called is True + assert stream._write_buffer == b"" + + +# ============================================================================= +# Client Error Path Tests +# ============================================================================= + + +@pytest.mark.trio +async def test_client_stream_reset_mid_upload(security_protocol): + """Client handles stream reset during upload phase.""" + async with host_pair_factory(security_protocol=security_protocol) as ( + host_a, + host_b, + ): + # Server that resets stream after reading some bytes + async def resetting_server_handler(stream): + # Read a bit then reset + await stream.read(100) + await stream.reset() + + host_a.set_stream_handler(TProtocol("/perf/1.0.0"), resetting_server_handler) + + client = PerfService(host_b) + + recv_bytes = 1000 + send_bytes = 10000 # Large upload to ensure we hit the reset + + # Should raise an exception due to stream reset + with pytest.raises(Exception): + async for _ in client.measure_performance( + host_a.get_addrs()[0].encapsulate(f"/p2p/{host_a.get_id()}"), + send_bytes, + recv_bytes, + ): + pass + + +@pytest.mark.trio +async def test_client_byte_count_mismatch(security_protocol): + """Client raises ValueError when server sends fewer bytes than requested.""" + from libp2p.network.stream.exceptions import StreamEOF + + async with host_pair_factory(security_protocol=security_protocol) as ( + host_a, + host_b, + ): + # Server that sends fewer bytes than the client expects + async def short_response_handler(stream): + # Read the 8-byte header to get requested bytes + header = await stream.read(8) + requested_bytes = struct.unpack(">Q", header)[0] + + # Drain the upload data (handle EOF when client finishes sending) + try: + while True: + data = await stream.read(4096) + if not data: + break + except StreamEOF: + pass # Expected when client closes write side + + # Intentionally send fewer bytes than requested + bytes_to_send = max(0, requested_bytes - 100) + await stream.write(b"\x00" * bytes_to_send) + await stream.close() + + host_a.set_stream_handler(TProtocol("/perf/1.0.0"), short_response_handler) + + client = PerfService(host_b) + + recv_bytes = 1000 # Request 1000 bytes + send_bytes = 100 + + # Should raise ValueError due to byte count mismatch + with pytest.raises(ValueError) as exc_info: + async for _ in client.measure_performance( + host_a.get_addrs()[0].encapsulate(f"/p2p/{host_a.get_id()}"), + send_bytes, + recv_bytes, + ): + pass + + # Verify the error message contains expected information + error_msg = str(exc_info.value) + assert "Expected to receive" in error_msg or "expected" in error_msg.lower() diff --git a/tests/core/perf/test_perf_server.py b/tests/core/perf/test_perf_server.py new file mode 100644 index 000000000..989389da4 --- /dev/null +++ b/tests/core/perf/test_perf_server.py @@ -0,0 +1,83 @@ +"""Tests for perf protocol server-side functionality.""" + +import struct + +import pytest + +from .conftest import MockNetStream + + +@pytest.mark.trio +async def test_parse_valid_8_byte_header(perf_service): + """Verify server correctly parses a valid 8-byte big-endian header.""" + # Create header requesting 1024 bytes back + recv_bytes = 1024 + header = struct.pack(">Q", recv_bytes) + # Add some upload data after header + upload_data = b"x" * 100 + + stream = MockNetStream(header + upload_data) + await perf_service._handle_message(stream) + + # Server should have written exactly 1024 bytes back + assert len(stream._write_buffer) == recv_bytes + assert stream._closed is True + assert stream._reset_called is False + + +@pytest.mark.trio +async def test_server_extracts_response_length(perf_service): + """Verify server extracts correct bytes_to_send_back value from header.""" + # Test with specific value: 5000 bytes + recv_bytes = 5000 + header = struct.pack(">Q", recv_bytes) + + stream = MockNetStream(header) + await perf_service._handle_message(stream) + + # Server should send exactly 5000 bytes + assert len(stream._write_buffer) == recv_bytes + + +@pytest.mark.trio +async def test_server_handles_zero_response_length(perf_service): + """Edge case: header requests 0 bytes back.""" + recv_bytes = 0 + header = struct.pack(">Q", recv_bytes) + upload_data = b"y" * 50 + + stream = MockNetStream(header + upload_data) + await perf_service._handle_message(stream) + + # Server should write 0 bytes back + assert len(stream._write_buffer) == 0 + assert stream._closed is True + + +@pytest.mark.trio +async def test_server_handles_large_response_length(perf_service): + """Edge case: header requests large bytes back (1MB).""" + recv_bytes = 1024 * 1024 # 1MB + header = struct.pack(">Q", recv_bytes) + + stream = MockNetStream(header) + await perf_service._handle_message(stream) + + # Server should send exactly 1MB + assert len(stream._write_buffer) == recv_bytes + assert stream._closed is True + + +@pytest.mark.trio +async def test_server_sends_correct_byte_count(perf_service): + """Verify server sends exactly the requested number of bytes.""" + test_cases = [1, 100, 1024, 65536] + + for recv_bytes in test_cases: + header = struct.pack(">Q", recv_bytes) + stream = MockNetStream(header) + await perf_service._handle_message(stream) + + assert len(stream._write_buffer) == recv_bytes, ( + f"Expected {recv_bytes} bytes, got {len(stream._write_buffer)}" + )