diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp index 33e421eb4b4..86e8d1dffa6 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.cpp +++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp @@ -1144,6 +1144,55 @@ void TCPTransportInterface::perform_listen_operation( } EPROSIMA_LOG_INFO(RTCP, "End PerformListenOperation " << channel->locator()); + + // If we get here, the channel has been disconnected. We might need to clean it up if + // the remote endpoint is the one that initiated the disconnection. + // We only delete acceptor channels, as connect channels need to be kept in channel_resources_ to restart the connection + if (channel && channel->tcp_connection_type() == TCPChannelResource::TCPConnectionType::TCP_ACCEPT_TYPE) + { + // Defer the erase to io_context_ so the TCPChannelResource destructor runs off the listener thread that is about to exit. + // Weak_ptr is used to avoid keeping the channel alive if it has already been removed from the maps by another thread. + asio::post(io_context_, [this, channel_weak]() + { + auto ch = channel_weak.lock(); + if (!ch) + { + return; + } + { + // Channel resources map case + std::unique_lock scoped_lock(sockets_map_mutex_); + bool erased = false; + // There might be multiple entries with the same channel. Delete them all + for (auto it = channel_resources_.begin(); it != channel_resources_.end(); ) + { + if (it->second == ch) + { + it = channel_resources_.erase(it); + erased = true; + } + else + { + ++it; + } + } + if (erased) + { + return; + } + } + // Unbound channel resources map case + std::unique_lock unbound_lock(unbound_map_mutex_); + auto it = std::find(unbound_channel_resources_.begin(), + unbound_channel_resources_.end(), ch); + if (it != unbound_channel_resources_.end()) + { + unbound_channel_resources_.erase(it); + } + }); + // Drop the listener's reference so the destructor cannot run on this thread + channel.reset(); + } } bool TCPTransportInterface::read_body( diff --git a/test/unittest/transport/TCPv4Tests.cpp b/test/unittest/transport/TCPv4Tests.cpp index 1bae62093f5..cad15bb9199 100644 --- a/test/unittest/transport/TCPv4Tests.cpp +++ b/test/unittest/transport/TCPv4Tests.cpp @@ -2430,6 +2430,62 @@ TEST_F(TCPv4Tests, add_logical_port_on_send_resource_creation) } } +// This test verifies that TCP channels of type ACCEPT are correctly removed from the channel resources map when +// the channel is disabled by asio. This is the case when a client disconnects from the server. There is no need +// maintain the channel resource of a disconnected client because new connections will generate new channel resources +// and no unbind operation is needed at destruction time for a removed participant (eDisconnected channel). +TEST_F(TCPv4Tests, remove_stale_channel_resources_of_server) +{ + // Server + TCPv4TransportDescriptor serverDescriptor; + serverDescriptor.add_listener_port(g_default_port); + serverDescriptor.keep_alive_frequency_ms = 1000; + serverDescriptor.keep_alive_timeout_ms = 2000; + MockTCPv4Transport server(serverDescriptor); + ASSERT_TRUE(server.init()); + + // Client + { + TCPv4TransportDescriptor clientDescriptor; + auto client = std::unique_ptr(new TCPv4Transport(clientDescriptor)); + ASSERT_TRUE(client->init()); + + Locator_t outputLocator; + outputLocator.kind = LOCATOR_KIND_TCPv4; + IPLocator::setIPv4(outputLocator, 127, 0, 0, 1); + IPLocator::setPhysicalPort(outputLocator, g_default_port); + IPLocator::setLogicalPort(outputLocator, 7410); + + SendResourceList send_resource_list; + ASSERT_TRUE(client->OpenOutputChannel(send_resource_list, outputLocator)); + + // Wait for the server to finish the BindConnectionRequest handshake + auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); + while (server.get_channel_resources_size() != 0 && + std::chrono::steady_clock::now() < deadline) + { + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + } + // Ensure there are channel resources in the server. Bind socket adds an entry per interface available, so there could be more than one. + ASSERT_GT(server.get_channel_resources().size(), 0u); + + // Tear down the client: clean send_resource_list and then close the TCP socket. + send_resource_list.clear(); + client.reset(); + } + + // Check that the server correctly removes the channel resource of type ACCEPT after the client disconnection + auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10); + while (server.get_channel_resources_size() != 0 && + std::chrono::steady_clock::now() < deadline) + { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + EXPECT_TRUE(server.get_channel_resources().empty()); + EXPECT_TRUE(server.get_unbound_channel_resources().empty()); +} + + void TCPv4Tests::HELPER_SetDescriptorDefaults() { descriptor.add_listener_port(g_default_port); diff --git a/test/unittest/transport/mock/MockTCPv4Transport.h b/test/unittest/transport/mock/MockTCPv4Transport.h index c02a26d4ca9..ad2a9eb77a0 100644 --- a/test/unittest/transport/mock/MockTCPv4Transport.h +++ b/test/unittest/transport/mock/MockTCPv4Transport.h @@ -40,6 +40,12 @@ class MockTCPv4Transport : public TCPv4Transport return channel_resources_; } + size_t get_channel_resources_size() const + { + std::lock_guard lock(sockets_map_mutex_); + return unbound_channel_resources_.size(); + } + const std::vector> get_unbound_channel_resources() const { return unbound_channel_resources_;