diff --git a/libp2p/host/basic_host.py b/libp2p/host/basic_host.py index 2b5adca27..0dd14bba1 100644 --- a/libp2p/host/basic_host.py +++ b/libp2p/host/basic_host.py @@ -279,6 +279,7 @@ def __init__( # Automatic identify coordination self._identify_inflight: set[ID] = set() self._identified_peers: set[ID] = set() + self._identify_scopes: dict[ID, trio.CancelScope] = {} self._network.register_notifee(_IdentifyNotifee(self)) def get_id(self) -> ID: @@ -978,6 +979,34 @@ async def disconnect(self, peer_id: ID) -> None: await self._network.close_peer(peer_id) async def close(self) -> None: + """ + Close the host and its underlying network service. + + Cleanup order is intentional: stop background services, remove UPnP port + mappings, cancel inflight identify tasks, then close the network. + """ + # Stop background services + if self.mDNS is not None: + self.mDNS.stop() + + if self.bootstrap is not None: + self.bootstrap.stop() + + # Cleanup UPnP mappings if active + if self.upnp and self.upnp.get_external_ip(): + try: + logger.debug("Removing UPnP port mappings due to host closure") + for addr in self.get_addrs(): + if port := addr.value_for_protocol("tcp"): + await self.upnp.remove_port_mapping(int(port), "TCP") + except Exception as e: + logger.warning(f"Error removing UPnP mappings during close: {e}") + + # Cancel inflight identify tasks + for scope in list(self._identify_scopes.values()): + scope.cancel() + + # Close network await self._network.close() def _schedule_identify(self, peer_id: ID, *, reason: str) -> None: @@ -993,14 +1022,28 @@ def _schedule_identify(self, peer_id: ID, *, reason: str) -> None: return if not self._should_identify_peer(peer_id): return + self._identify_inflight.add(peer_id) - trio.lowlevel.spawn_system_task(self._identify_task_entry, peer_id, reason) - async def _identify_task_entry(self, peer_id: ID, reason: str) -> None: + # Create a new cancel scope for this identify task + cancel_scope = trio.CancelScope() + self._identify_scopes[peer_id] = cancel_scope + + trio.lowlevel.spawn_system_task( + self._identify_task_entry, peer_id, reason, cancel_scope + ) + + async def _identify_task_entry( + self, peer_id: ID, reason: str, cancel_scope: trio.CancelScope + ) -> None: try: - await self._identify_peer(peer_id, reason=reason) + with cancel_scope: + await self._identify_peer(peer_id, reason=reason) finally: self._identify_inflight.discard(peer_id) + # Remove scope from tracking if it matches (to avoid race conditions) + if self._identify_scopes.get(peer_id) is cancel_scope: + self._identify_scopes.pop(peer_id, None) def _has_cached_protocols(self, peer_id: ID) -> bool: """ diff --git a/newsfragments/92.feature.rst b/newsfragments/92.feature.rst new file mode 100644 index 000000000..eddb5cdfb --- /dev/null +++ b/newsfragments/92.feature.rst @@ -0,0 +1 @@ +BasicHost and test infrastructure now perform proper shutdown and resource cleanup, eliminating "Task was destroyed but it is pending!" warnings. diff --git a/tests/core/host/test_basic_host.py b/tests/core/host/test_basic_host.py index 4b8a30be2..c7956c2db 100644 --- a/tests/core/host/test_basic_host.py +++ b/tests/core/host/test_basic_host.py @@ -83,10 +83,13 @@ async def fake_negotiate(comm, timeout): monkeypatch.setattr(host.multiselect, "negotiate", fake_negotiate) # Now run the handler and expect StreamFailure - with pytest.raises( - StreamFailure, match="Failed to negotiate protocol: no protocol selected" - ): - await host._swarm_stream_handler(net_stream) + try: + with pytest.raises( + StreamFailure, match="Failed to negotiate protocol: no protocol selected" + ): + await host._swarm_stream_handler(net_stream) + finally: + await host.close() # Ensure reset was called since negotiation failed net_stream.reset.assert_awaited() diff --git a/tests/utils/factories.py b/tests/utils/factories.py index 072c1c4c5..62a43fdf2 100644 --- a/tests/utils/factories.py +++ b/tests/utils/factories.py @@ -534,7 +534,11 @@ async def create_batch_and_listen( number, security_protocol=security_protocol, muxer_opt=muxer_opt ) as swarms: hosts = tuple(BasicHost(swarm) for swarm in swarms) - yield hosts + try: + yield hosts + finally: + for host in hosts: + await host.close() class DummyRouter(IPeerRouting):