From 84fe94bbf80257775c7ba33b6689dc2e567c770c Mon Sep 17 00:00:00 2001 From: Abhiswant Chaudhary Date: Sun, 10 May 2026 14:56:48 +0530 Subject: [PATCH] feat: add gossipsub large message example for 500KB payload delivery timing Signed-off-by: Abhiswant Chaudhary --- docs/examples.pubsub.rst | 47 +++++ examples/pubsub/gossipsub_large_message.py | 226 +++++++++++++++++++++ pyproject.toml | 1 + 3 files changed, 274 insertions(+) create mode 100644 examples/pubsub/gossipsub_large_message.py diff --git a/docs/examples.pubsub.rst b/docs/examples.pubsub.rst index 8990e5c08..b921c2e02 100644 --- a/docs/examples.pubsub.rst +++ b/docs/examples.pubsub.rst @@ -62,3 +62,50 @@ The full source code for this example is below: .. literalinclude:: ../examples/pubsub/pubsub.py :language: python :linenos: + + +GossipSub Large Message Demo +============================ + +This example demonstrates one-shot large payload behavior using GossipSub. +It runs two peers: a receiver and a sender. The sender publishes one 500KB +message, and the receiver prints payload size plus end-to-end timing. + +.. code-block:: console + + $ gossipsub-large-message-demo + 2026-05-10 10:10:00,000 - gossipsub-large-message-demo - INFO - Running receiver mode + 2026-05-10 10:10:00,010 - gossipsub-large-message-demo - INFO - Receiver ready with peer ID: Qm... + 2026-05-10 10:10:00,010 - gossipsub-large-message-demo - INFO - Topic: gossipsub-large-message + 2026-05-10 10:10:00,010 - gossipsub-large-message-demo - INFO - Run this in a second terminal to send 500KB: + 2026-05-10 10:10:00,010 - gossipsub-large-message-demo - INFO - gossipsub-large-message-demo -d /ip4/127.0.0.1/tcp/12345/p2p/Qm... -t gossipsub-large-message + +Open a second terminal and run the printed command: + +.. code-block:: console + + $ gossipsub-large-message-demo -d /ip4/127.0.0.1/tcp/12345/p2p/Qm... -t gossipsub-large-message + 2026-05-10 10:10:05,000 - gossipsub-large-message-demo - INFO - Running sender mode + 2026-05-10 10:10:05,200 - gossipsub-large-message-demo - INFO - Publishing 512000-byte payload to topic 'gossipsub-large-message'... + 2026-05-10 10:10:05,260 - gossipsub-large-message-demo - INFO - Publish completed in 60.00 ms + +The receiver then prints confirmation and timing: + +.. code-block:: console + + 2026-05-10 10:10:05,300 - gossipsub-large-message-demo - INFO - Received payload: 512000 bytes on topic 'gossipsub-large-message' from Qm... + 2026-05-10 10:10:05,300 - gossipsub-large-message-demo - INFO - End-to-end elapsed time: 100.00 ms + +Command Line Options +-------------------- + +- ``-t, --topic``: Topic name to use (default: ``gossipsub-large-message``) +- ``-d, --destination``: Receiver address for sender mode +- ``-p, --port``: Local listen port (default: random available port) +- ``-v, --verbose``: Enable debug logging + +The full source code for this example is below: + +.. literalinclude:: ../examples/pubsub/gossipsub_large_message.py + :language: python + :linenos: diff --git a/examples/pubsub/gossipsub_large_message.py b/examples/pubsub/gossipsub_large_message.py new file mode 100644 index 000000000..25eed3ccf --- /dev/null +++ b/examples/pubsub/gossipsub_large_message.py @@ -0,0 +1,226 @@ +import argparse +import logging +import struct +import time + +import multiaddr +import trio + +from libp2p import ( + new_host, +) +from libp2p.crypto.rsa import ( + create_new_key_pair, +) +from libp2p.custom_types import ( + TProtocol, +) +from libp2p.peer.peerinfo import ( + info_from_p2p_addr, +) +from libp2p.peer.id import ( + ID, +) +from libp2p.pubsub.gossipsub import ( + GossipSub, +) +from libp2p.pubsub.pubsub import ( + Pubsub, +) +from libp2p.stream_muxer.mplex.mplex import ( + MPLEX_PROTOCOL_ID, + Mplex, +) +from libp2p.tools.anyio_service import ( + background_trio_service, +) +from libp2p.utils.address_validation import ( + find_free_port, +) + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger("gossipsub-large-message-demo") + +GOSSIPSUB_PROTOCOL_ID = TProtocol("/meshsub/1.0.0") +TOPIC = "gossipsub-large-message" +PAYLOAD_SIZE_BYTES = 500 * 1024 +TIMESTAMP_BYTES = 8 + + +def build_large_payload() -> bytes: + sent_at = time.time() + body_size = PAYLOAD_SIZE_BYTES - TIMESTAMP_BYTES + if body_size <= 0: + raise ValueError("PAYLOAD_SIZE_BYTES must be larger than 8 bytes") + body = b"x" * body_size + return struct.pack("!d", sent_at) + body + + +def extract_timestamp(payload: bytes) -> float: + if len(payload) < TIMESTAMP_BYTES: + raise ValueError("payload does not include timestamp header") + return struct.unpack("!d", payload[:TIMESTAMP_BYTES])[0] + + +async def create_pubsub_stack() -> tuple: + host = new_host( + key_pair=create_new_key_pair(), + muxer_opt={MPLEX_PROTOCOL_ID: Mplex}, + ) + gossipsub = GossipSub( + protocols=[GOSSIPSUB_PROTOCOL_ID], + degree=3, + degree_low=2, + degree_high=4, + direct_peers=None, + time_to_live=60, + gossip_window=2, + gossip_history=5, + heartbeat_initial_delay=2.0, + heartbeat_interval=5, + ) + pubsub = Pubsub(host, gossipsub) + return host, gossipsub, pubsub + + +async def run_receiver(topic: str, port: int | None) -> None: + from libp2p.utils.address_validation import ( + get_available_interfaces, + get_optimal_binding_address, + ) + + if port is None or port == 0: + port = find_free_port() + logger.info("Using random available port: %s", port) + + host, gossipsub, pubsub = await create_pubsub_stack() + listen_addrs = get_available_interfaces(port) + + async with host.run(listen_addrs=listen_addrs): + async with background_trio_service(pubsub): + async with background_trio_service(gossipsub): + await pubsub.wait_until_ready() + subscription = await pubsub.subscribe(topic) + + optimal_addr = get_optimal_binding_address(port) + receiver_addr = f"{optimal_addr}/p2p/{host.get_id().to_string()}" + + logger.info("Receiver ready with peer ID: %s", host.get_id()) + logger.info("Topic: %s", topic) + logger.info("Run this in a second terminal to send 500KB:") + logger.info("gossipsub-large-message-demo -d %s -t %s", receiver_addr, topic) + + logger.info("Waiting for one large message...") + message = await subscription.get() + received_at = time.time() + sent_at = extract_timestamp(message.data) + elapsed_ms = (received_at - sent_at) * 1000 + sender_peer_id = ID(message.from_id).to_base58() + + logger.info( + "Received payload: %d bytes on topic '%s' from %s", + len(message.data), + topic, + sender_peer_id, + ) + logger.info("End-to-end elapsed time: %.2f ms", elapsed_ms) + + +async def run_sender(topic: str, destination: str, port: int | None) -> None: + from libp2p.utils.address_validation import ( + get_available_interfaces, + ) + + if port is None or port == 0: + port = find_free_port() + + host, gossipsub, pubsub = await create_pubsub_stack() + listen_addrs = get_available_interfaces(port) + + async with host.run(listen_addrs=listen_addrs): + async with background_trio_service(pubsub): + async with background_trio_service(gossipsub): + await pubsub.wait_until_ready() + + maddr = multiaddr.Multiaddr(destination) + info = info_from_p2p_addr(maddr) + await host.connect(info) + + await pubsub.wait_for_peer(info.peer_id) + await pubsub.wait_for_subscription(info.peer_id, topic) + + payload = build_large_payload() + logger.info( + "Publishing %d-byte payload to topic '%s'...", + len(payload), + topic, + ) + publish_start = time.perf_counter() + await pubsub.publish(topic, payload) + publish_ms = (time.perf_counter() - publish_start) * 1000 + logger.info("Publish completed in %.2f ms", publish_ms) + + +async def run(topic: str, destination: str | None, port: int | None) -> None: + if destination: + await run_sender(topic, destination, port) + else: + await run_receiver(topic, port) + + +def main() -> None: + parser = argparse.ArgumentParser( + description=( + "GossipSub large-message demo. Start receiver first, then run sender " + "with -d to publish one 500KB payload and print timing on receiver." + ) + ) + parser.add_argument( + "-t", + "--topic", + type=str, + default=TOPIC, + help="Topic name to use (default: gossipsub-large-message)", + ) + parser.add_argument( + "-d", + "--destination", + type=str, + default=None, + help="Receiver multiaddr with /p2p/ (sender mode)", + ) + parser.add_argument( + "-p", + "--port", + type=int, + default=None, + help="Local listen port (default: random free port)", + ) + parser.add_argument( + "-v", + "--verbose", + action="store_true", + help="Enable debug logging", + ) + + args = parser.parse_args() + + if args.verbose: + logger.setLevel(logging.DEBUG) + + if args.destination: + logger.info("Running sender mode") + else: + logger.info("Running receiver mode") + + try: + trio.run(run, *(args.topic, args.destination, args.port)) + except KeyboardInterrupt: + logger.info("Application terminated by user") + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index a12ceb021..71cdd9389 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -72,6 +72,7 @@ identify-push-demo = "examples.identify_push.identify_push_demo:run_main" identify-push-listener-dialer-demo = "examples.identify_push.identify_push_listener_dialer:main" pubsub-demo = "examples.pubsub.pubsub:main" floodsub-demo = "examples.pubsub.floodsub:main" +gossipsub-large-message-demo = "examples.pubsub.gossipsub_large_message:main" mdns-demo = "examples.mDNS.mDNS:main" circuit-relay-demo = "examples.circuit_relay.relay_example:main" tls-demo = "examples.tls.example_tls_server:main"