Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions src/cpp/rtps/transport/TCPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1144,6 +1144,55 @@ void TCPTransportInterface::perform_listen_operation(
}

EPROSIMA_LOG_INFO(RTCP, "End PerformListenOperation " << channel->locator());

// If we get here, the channel has been disconnected. We might need to clean it up if
// the remote endpoint is the one that initiated the disconnection.
// We only delete acceptor channels, as connect channels need to be kept in channel_resources_ to restart the connection
if (channel && channel->tcp_connection_type() == TCPChannelResource::TCPConnectionType::TCP_ACCEPT_TYPE)
{
// Defer the erase to io_context_ so the TCPChannelResource destructor runs off the listener thread that is about to exit.
// Weak_ptr is used to avoid keeping the channel alive if it has already been removed from the maps by another thread.
asio::post(io_context_, [this, channel_weak]()
{
auto ch = channel_weak.lock();
if (!ch)
{
return;
}
{
// Channel resources map case
std::unique_lock<std::mutex> scoped_lock(sockets_map_mutex_);
bool erased = false;
// There might be multiple entries with the same channel. Delete them all
for (auto it = channel_resources_.begin(); it != channel_resources_.end(); )
{
if (it->second == ch)
{
it = channel_resources_.erase(it);
erased = true;
}
else
{
++it;
}
}
if (erased)
{
return;
}
}
// Unbound channel resources map case
std::unique_lock<std::mutex> unbound_lock(unbound_map_mutex_);
auto it = std::find(unbound_channel_resources_.begin(),
unbound_channel_resources_.end(), ch);
if (it != unbound_channel_resources_.end())
{
unbound_channel_resources_.erase(it);
}
});
// Drop the listener's reference so the destructor cannot run on this thread
channel.reset();
}
}

bool TCPTransportInterface::read_body(
Expand Down
56 changes: 56 additions & 0 deletions test/unittest/transport/TCPv4Tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2430,6 +2430,62 @@ TEST_F(TCPv4Tests, add_logical_port_on_send_resource_creation)
}
}

// This test verifies that TCP channels of type ACCEPT are correctly removed from the channel resources map when
// the channel is disabled by asio. This is the case when a client disconnects from the server. There is no need
// maintain the channel resource of a disconnected client because new connections will generate new channel resources
// and no unbind operation is needed at destruction time for a removed participant (eDisconnected channel).
TEST_F(TCPv4Tests, remove_stale_channel_resources_of_server)
{
// Server
TCPv4TransportDescriptor serverDescriptor;
serverDescriptor.add_listener_port(g_default_port);
serverDescriptor.keep_alive_frequency_ms = 1000;
serverDescriptor.keep_alive_timeout_ms = 2000;
MockTCPv4Transport server(serverDescriptor);
ASSERT_TRUE(server.init());

// Client
{
TCPv4TransportDescriptor clientDescriptor;
auto client = std::unique_ptr<TCPv4Transport>(new TCPv4Transport(clientDescriptor));
ASSERT_TRUE(client->init());

Locator_t outputLocator;
outputLocator.kind = LOCATOR_KIND_TCPv4;
IPLocator::setIPv4(outputLocator, 127, 0, 0, 1);
IPLocator::setPhysicalPort(outputLocator, g_default_port);
IPLocator::setLogicalPort(outputLocator, 7410);

SendResourceList send_resource_list;
ASSERT_TRUE(client->OpenOutputChannel(send_resource_list, outputLocator));

// Wait for the server to finish the BindConnectionRequest handshake
auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5);
while (server.get_channel_resources_size() != 0 &&
std::chrono::steady_clock::now() < deadline)
{
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
// Ensure there are channel resources in the server. Bind socket adds an entry per interface available, so there could be more than one.
ASSERT_GT(server.get_channel_resources().size(), 0u);

// Tear down the client: clean send_resource_list and then close the TCP socket.
send_resource_list.clear();
client.reset();
}

// Check that the server correctly removes the channel resource of type ACCEPT after the client disconnection
auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10);
while (server.get_channel_resources_size() != 0 &&
std::chrono::steady_clock::now() < deadline)
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
EXPECT_TRUE(server.get_channel_resources().empty());
EXPECT_TRUE(server.get_unbound_channel_resources().empty());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

size_t get_unbound_channel_resources_size() const
{
    std::lock_guard<std::mutex> lock(unbound_map_mutex_);
    return unbound_channel_resources_.size();
}

}


void TCPv4Tests::HELPER_SetDescriptorDefaults()
{
descriptor.add_listener_port(g_default_port);
Expand Down
6 changes: 6 additions & 0 deletions test/unittest/transport/mock/MockTCPv4Transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ class MockTCPv4Transport : public TCPv4Transport
return channel_resources_;
}

size_t get_channel_resources_size() const
{
std::lock_guard<std::mutex> lock(sockets_map_mutex_);
return unbound_channel_resources_.size();
}

const std::vector<std::shared_ptr<TCPChannelResource>> get_unbound_channel_resources() const
{
return unbound_channel_resources_;
Expand Down
Loading