Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 46 additions & 3 deletions libp2p/host/basic_host.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ def __init__(
# Automatic identify coordination
self._identify_inflight: set[ID] = set()
self._identified_peers: set[ID] = set()
self._identify_scopes: dict[ID, trio.CancelScope] = {}
self._network.register_notifee(_IdentifyNotifee(self))

def get_id(self) -> ID:
Expand Down Expand Up @@ -978,6 +979,34 @@ async def disconnect(self, peer_id: ID) -> None:
await self._network.close_peer(peer_id)

async def close(self) -> None:
"""
Close the host and its underlying network service.

Cleanup order is intentional: stop background services, remove UPnP port
mappings, cancel inflight identify tasks, then close the network.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not blocking, but worth thinking about: calling close() twice could double-invoke self.mDNS.stop()zeroconf.close(), which may raise. And there's a potential overlap with the run() context manager's finally block (L430-439) which already stops mDNS and removes UPnP mappings.

A simple _closed flag in __init__ checked at the top of this method would make it idempotent and safe in either ordering. @acul71 raised the same concern. Up to you — just flagging the edge case.

"""
# Stop background services
if self.mDNS is not None:
self.mDNS.stop()

if self.bootstrap is not None:
self.bootstrap.stop()

# Cleanup UPnP mappings if active
if self.upnp and self.upnp.get_external_ip():
try:
logger.debug("Removing UPnP port mappings due to host closure")
for addr in self.get_addrs():
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small inconsistency: this uses self.get_addrs() but the same UPnP cleanup in the run() context manager (around L435) uses self.get_transport_addrs().

Functionally both work since value_for_protocol("tcp") extracts the port either way, but get_addrs() returns the /p2p/{peer_id} suffixed form. Would be cleaner to use get_transport_addrs() here to match the run() path — easier to reason about if both cleanup routes look the same.

if port := addr.value_for_protocol("tcp"):
await self.upnp.remove_port_mapping(int(port), "TCP")
except Exception as e:
logger.warning(f"Error removing UPnP mappings during close: {e}")

# Cancel inflight identify tasks
for scope in list(self._identify_scopes.values()):
scope.cancel()

# Close network
await self._network.close()

def _schedule_identify(self, peer_id: ID, *, reason: str) -> None:
Expand All @@ -993,14 +1022,28 @@ def _schedule_identify(self, peer_id: ID, *, reason: str) -> None:
return
if not self._should_identify_peer(peer_id):
return

self._identify_inflight.add(peer_id)
trio.lowlevel.spawn_system_task(self._identify_task_entry, peer_id, reason)

async def _identify_task_entry(self, peer_id: ID, reason: str) -> None:
# Create a new cancel scope for this identify task
cancel_scope = trio.CancelScope()
self._identify_scopes[peer_id] = cancel_scope

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This blank line still has trailing whitespace — ruff fails CI with W293: Blank line contains whitespace. Flagged by @acul71 last round too. Quick fix: ruff check --fix libp2p/host/basic_host.py.

trio.lowlevel.spawn_system_task(
self._identify_task_entry, peer_id, reason, cancel_scope
)

async def _identify_task_entry(
self, peer_id: ID, reason: str, cancel_scope: trio.CancelScope
) -> None:
try:
await self._identify_peer(peer_id, reason=reason)
with cancel_scope:
await self._identify_peer(peer_id, reason=reason)
finally:
self._identify_inflight.discard(peer_id)
# Remove scope from tracking if it matches (to avoid race conditions)
if self._identify_scopes.get(peer_id) is cancel_scope:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call on the self._identify_scopes.get(peer_id) is cancel_scope check — prevents a new identify task for the same peer from having its scope yanked by a finishing old one. Clean pattern 👍

self._identify_scopes.pop(peer_id, None)

def _has_cached_protocols(self, peer_id: ID) -> bool:
"""
Expand Down
1 change: 1 addition & 0 deletions newsfragments/92.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
BasicHost and test infrastructure now perform proper shutdown and resource cleanup, eliminating "Task was destroyed but it is pending!" warnings.
11 changes: 7 additions & 4 deletions tests/core/host/test_basic_host.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,13 @@ async def fake_negotiate(comm, timeout):
monkeypatch.setattr(host.multiselect, "negotiate", fake_negotiate)

# Now run the handler and expect StreamFailure
with pytest.raises(
StreamFailure, match="Failed to negotiate protocol: no protocol selected"
):
await host._swarm_stream_handler(net_stream)
try:
with pytest.raises(
StreamFailure, match="Failed to negotiate protocol: no protocol selected"
):
await host._swarm_stream_handler(net_stream)
finally:
await host.close()

# Ensure reset was called since negotiation failed
net_stream.reset.assert_awaited()
Expand Down
6 changes: 5 additions & 1 deletion tests/utils/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,11 @@ async def create_batch_and_listen(
number, security_protocol=security_protocol, muxer_opt=muxer_opt
) as swarms:
hosts = tuple(BasicHost(swarm) for swarm in swarms)
yield hosts
try:
yield hosts
finally:
for host in hosts:
await host.close()


class DummyRouter(IPeerRouting):
Expand Down