Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions src/cpp/rtps/transport/TCPChannelResource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ TCPChannelResource::TCPChannelResource(
: ChannelResource(maxMsgSize)
, parent_ (parent)
, locator_(locator)
, waiting_for_keep_alive_(false)
, connection_status_(eConnectionStatus::eDisconnected)
, tcp_connection_type_(TCPConnectionType::TCP_CONNECT_TYPE)
{
Expand All @@ -65,7 +64,6 @@ TCPChannelResource::TCPChannelResource(
: ChannelResource(maxMsgSize)
, parent_(parent)
, locator_()
, waiting_for_keep_alive_(false)
, connection_status_(eConnectionStatus::eDisconnected)
, tcp_connection_type_(TCPConnectionType::TCP_ACCEPT_TYPE)
{
Expand Down Expand Up @@ -196,7 +194,6 @@ void TCPChannelResource::add_logical_port(
}
}
}

}

void TCPChannelResource::send_pending_open_logical_ports(
Expand Down
1 change: 0 additions & 1 deletion src/cpp/rtps/transport/TCPChannelResource.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ class TCPChannelResource : public ChannelResource

TCPTransportInterface* parent_;
Locator locator_;
bool waiting_for_keep_alive_;
// Must be accessed after lock pending_logical_mutex_
std::map<TCPTransactionId, uint16_t> negotiating_logical_ports_;
std::map<TCPTransactionId, uint16_t> last_checked_logical_port_;
Expand Down
84 changes: 3 additions & 81 deletions src/cpp/rtps/transport/TCPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,10 @@ namespace rtps {

using Log = fastdds::dds::Log;

static const int s_default_keep_alive_frequency = 5000; // 5 SECONDS
static const int s_default_keep_alive_timeout = 15000; // 15 SECONDS
//static const int s_clean_deleted_sockets_pool_timeout = 100; // 100 MILLISECONDS

TCPTransportDescriptor::TCPTransportDescriptor()
: SocketTransportDescriptor(s_maximumMessageSize, s_maximumInitialPeersRange)
, keep_alive_frequency_ms(s_default_keep_alive_frequency)
, keep_alive_timeout_ms(s_default_keep_alive_timeout)
, keep_alive_frequency_ms(0)
, keep_alive_timeout_ms(0)
, max_logical_port(100)
, logical_port_range(20)
, logical_port_increment(2)
Expand Down Expand Up @@ -168,7 +164,6 @@ TCPTransportInterface::TCPTransportInterface(
#if TLS_FOUND
, ssl_context_(asio::ssl::context::sslv23)
#endif // if TLS_FOUND
, keep_alive_event_(io_context_timers_)
{
}

Expand All @@ -181,13 +176,6 @@ void TCPTransportInterface::clean()
assert(receiver_resources_.size() == 0);
alive_.store(false);

keep_alive_event_.cancel();
if (io_context_timers_thread_.joinable())
{
io_context_timers_.stop();
io_context_timers_thread_.join();
}

{
std::vector<std::shared_ptr<TCPChannelResource>> channels;
std::vector<eprosima::fastdds::rtps::Locator> delete_channels;
Expand Down Expand Up @@ -583,15 +571,7 @@ bool TCPTransportInterface::init(

if (0 < configuration()->keep_alive_frequency_ms)
{
auto ioContextTimersFunction = [&]()
{
asio::executor_work_guard<asio::io_context::executor_type> work =
make_work_guard(io_context_timers_.
get_executor());
io_context_timers_.run();
};
io_context_timers_thread_ = create_thread(ioContextTimersFunction,
configuration()->keep_alive_thread, "dds.tcp_keep");
EPROSIMA_LOG_WARNING(RTCP, "Keep alive feature only available in Fast DDS Pro.");
}

return true;
Expand Down Expand Up @@ -1076,64 +1056,6 @@ bool TCPTransportInterface::OpenInputChannel(
return success;
}

void TCPTransportInterface::keep_alive()
{
std::map<Locator, std::shared_ptr<TCPChannelResource>> tmp_vec;

{
std::unique_lock<std::mutex> scopedLock(sockets_map_mutex_); // Why mutex here?
tmp_vec = channel_resources_;
}


for (auto& channel_resource : tmp_vec)
{
if (TCPChannelResource::TCPConnectionType::TCP_CONNECT_TYPE == channel_resource.second->tcp_connection_type())
{
rtcp_message_manager_->sendKeepAliveRequest(channel_resource.second);
}
}
//TODO Check timeout.

/*
const TCPTransportDescriptor* config = configuration(); // Keep a copy for us.

std::chrono::time_point<std::chrono::system_clock> time_now = std::chrono::system_clock::now();
std::chrono::time_point<std::chrono::system_clock> next_time = time_now +
std::chrono::milliseconds(config->keep_alive_frequency_ms);
std::chrono::time_point<std::chrono::system_clock> timeout_time =
time_now + std::chrono::milliseconds(config->keep_alive_timeout_ms);

while (channel && TCPChannelResource::TCPConnectionStatus::TCP_CONNECTED == channel->tcp_connection_status())
{
if (channel->connection_established())
{
// KeepAlive
if (config->keep_alive_frequency_ms > 0 && config->keep_alive_timeout_ms > 0)
{
time_now = std::chrono::system_clock::now();

// Keep Alive Management
if (!channel->waiting_for_keep_alive_ && time_now > next_time)
{
std::unique_lock<std::mutex> scopedLock(sockets_map_mutex_); // Why mutex here?
rtcp_message_manager_->sendKeepAliveRequest(channel);
channel->waiting_for_keep_alive_ = true;
next_time = time_now + std::chrono::milliseconds(config->keep_alive_frequency_ms);
timeout_time = time_now + std::chrono::milliseconds(config->keep_alive_timeout_ms);
}
else if (channel->waiting_for_keep_alive_ && time_now >= timeout_time)
{
// Disable the socket to erase it after the reception.
close_tcp_socket(channel);
}
}
}
}
EPROSIMA_LOG_INFO(RTCP, "End perform_rtcp_management_thread " << channel->locator());
*/
}

void TCPTransportInterface::create_listening_thread(
const std::shared_ptr<TCPChannelResource>& channel)
{
Expand Down
5 changes: 0 additions & 5 deletions src/cpp/rtps/transport/TCPTransportInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ class TCPTransportInterface : public TransportInterface
asio::ssl::context ssl_context_;
#endif // if TLS_FOUND
eprosima::thread io_context_thread_;
eprosima::thread io_context_timers_thread_;
std::shared_ptr<RTCPMessageManager> rtcp_message_manager_;
std::mutex rtcp_message_manager_mutex_;
std::condition_variable rtcp_message_manager_cv_;
Expand All @@ -112,8 +111,6 @@ class TCPTransportInterface : public TransportInterface

std::vector<std::pair<TCPChannelResource*, uint64_t>> sockets_timestamp_;

asio::steady_timer keep_alive_event_;

std::map<Locator, std::shared_ptr<TCPAcceptor>> acceptors_;

eprosima::fastdds::statistics::rtps::OutputTrafficManager statistics_info_;
Expand Down Expand Up @@ -497,8 +494,6 @@ class TCPTransportInterface : public TransportInterface

virtual TCPTransportDescriptor* configuration() = 0;

void keep_alive();

void update_network_interfaces() override;

bool is_localhost_allowed() const override;
Expand Down
3 changes: 1 addition & 2 deletions src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ ResponseCode RTCPMessageManager::processOpenLogicalPortResponse(
}

ResponseCode RTCPMessageManager::processKeepAliveResponse(
std::shared_ptr<TCPChannelResource>& channel,
std::shared_ptr<TCPChannelResource>& /*channel*/,
ResponseCode respCode,
const TCPTransactionId& transaction_id)
{
Expand All @@ -654,7 +654,6 @@ ResponseCode RTCPMessageManager::processKeepAliveResponse(
switch (respCode)
{
case RETCODE_OK:
channel->waiting_for_keep_alive_ = false;
break;
case RETCODE_UNKNOWN_LOCATOR:
return RETCODE_UNKNOWN_LOCATOR;
Expand Down
Loading