diff --git a/CMakeLists.txt b/CMakeLists.txt index 88dcbb65..a9c9c097 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -285,6 +285,8 @@ target_sources(${FGE_SERVER_LIB_NAME} PRIVATE sources/network/C_packet.cpp sources/network/C_packetLZ4.cpp sources/network/C_server.cpp + sources/network/C_netServer.cpp + sources/network/C_netClient.cpp sources/network/C_socket.cpp sources/network/C_netCommand.cpp sources/network/C_protocol.cpp) @@ -403,6 +405,8 @@ target_sources(${FGE_LIB_NAME} PRIVATE sources/network/C_packet.cpp sources/network/C_packetLZ4.cpp sources/network/C_server.cpp + sources/network/C_netServer.cpp + sources/network/C_netClient.cpp sources/network/C_socket.cpp sources/network/C_netCommand.cpp sources/network/C_protocol.cpp) diff --git a/examples/clientServerLifeSimulator_004/client/main.cpp b/examples/clientServerLifeSimulator_004/client/main.cpp index 26ac628e..4509d893 100644 --- a/examples/clientServerLifeSimulator_004/client/main.cpp +++ b/examples/clientServerLifeSimulator_004/client/main.cpp @@ -275,7 +275,7 @@ int main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[]) std::cout << "connection ok" << std::endl; server.enableReturnPacket(true); - server._client.getPacketReorderer().setMaximumSize(FGE_NET_PACKET_REORDERER_CACHE_COMPUTE( + server.getClientContext()._reorderer.setMaximumSize(FGE_NET_PACKET_REORDERER_CACHE_COMPUTE( FGE_NET_DEFAULT_RETURN_PACKET_RATE.count(), LIFESIM_SERVER_TICK)); auto transmissionPacket = fge::net::CreatePacket(ls::LS_PROTOCOL_C_PLEASE_CONNECT_ME); diff --git a/examples/clientServerLifeSimulator_004/server/main.cpp b/examples/clientServerLifeSimulator_004/server/main.cpp index 862cd21f..59bddee7 100644 --- a/examples/clientServerLifeSimulator_004/server/main.cpp +++ b/examples/clientServerLifeSimulator_004/server/main.cpp @@ -138,6 +138,7 @@ int main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[]) //Handling clients connection serverFlux->_onClientConnected.addLambda([](fge::net::ClientSharedPtr const& client, fge::net::Identity const& id) { client->getStatus().setTimeout(LIFESIM_TIME_TIMEOUT); + std::cout << "New user connected and now trying to authenticate : " << id.toString() << std::endl; }); //Handling clients return packet @@ -212,7 +213,7 @@ int main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[]) fge::net::ReceivedPacketPtr packet; fge::net::FluxProcessResults processResult; do { - processResult = serverFlux->process(client, packet, true); + processResult = serverFlux->process(client, packet); if (processResult != fge::net::FluxProcessResults::USER_RETRIEVABLE) { continue; @@ -274,12 +275,10 @@ int main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[]) transmissionPacket->packet() << true; transmissionPacket->doNotReorder(); - std::cout << "new user : " << packet->getIdentity().toString() << " connected !" + std::cout << "new user : " << packet->getIdentity().toString() << " authenticated !" << std::endl; - //Create the new client with the packet identity - //client = std::make_shared(); - //clients.add(packet->getIdentity(), client); + //Authenticate the new client client->getStatus().setNetworkStatus(fge::net::ClientStatus::NetworkStatus::AUTHENTICATED); //Pack data required by the LatencyPlanner in order to compute latency diff --git a/includes/FastEngine/network/C_client.hpp b/includes/FastEngine/network/C_client.hpp index 9dbc40cc..e6da86b7 100644 --- a/includes/FastEngine/network/C_client.hpp +++ b/includes/FastEngine/network/C_client.hpp @@ -28,6 +28,7 @@ #include #include #include +#include #define FGE_NET_DEFAULT_LATENCY 20 #define FGE_NET_CLIENT_TIMESTAMP_MODULO 65536 @@ -212,6 +213,15 @@ class FGE_API ClientStatus std::chrono::steady_clock::time_point g_currentTimeout{std::chrono::steady_clock::now()}; }; +struct FGE_API CryptInfo +{ + ~CryptInfo(); + + void* _ssl{nullptr}; + void* _rbio{nullptr}; + void* _wbio{nullptr}; +}; + /** * \class Client * \brief Class that represent the identity of a client @@ -219,13 +229,6 @@ class FGE_API ClientStatus class FGE_API Client { public: - struct CryptInfo - { - void* _ssl{nullptr}; - void* _rbio{nullptr}; - void* _wbio{nullptr}; - }; - Client(); ~Client(); /** @@ -390,13 +393,8 @@ class FGE_API Client void resetLastReorderedPacketCounter(); [[nodiscard]] ProtocolPacket::CounterType getLastReorderedPacketCounter() const; - [[nodiscard]] PacketReorderer& getPacketReorderer(); - [[nodiscard]] PacketReorderer const& getPacketReorderer() const; - - [[nodiscard]] DataLockPair getPacketCache(); - [[nodiscard]] DataLockPair getPacketCache() const; void acknowledgeReception(ReceivedPacketPtr const& packet); - [[nodiscard]] std::vector const& getAcknowledgedList() const; + [[nodiscard]] std::unordered_set const& getAcknowledgedList() const; void clearAcknowledgedList(); void clearLostPacketCount(); @@ -439,9 +437,7 @@ class FGE_API Client ProtocolPacket::CounterType g_lastReorderedPacketCounter{0}; ProtocolPacket::CounterType g_clientPacketCounter{0}; - std::vector g_acknowledgedPackets; - PacketCache g_packetCache; - PacketReorderer g_packetReorderer; + std::unordered_set g_acknowledgedPackets; uint32_t g_lostPacketCount{0}; uint32_t g_lostPacketThreshold{FGE_NET_DEFAULT_lOST_PACKET_THRESHOLD}; diff --git a/includes/FastEngine/network/C_clientList.hpp b/includes/FastEngine/network/C_clientList.hpp index 0555e06e..45f0eb8b 100644 --- a/includes/FastEngine/network/C_clientList.hpp +++ b/includes/FastEngine/network/C_clientList.hpp @@ -34,21 +34,12 @@ class SocketUdp; using ClientSharedPtr = std::shared_ptr; -/** - * \struct ClientListEvent - * \ingroup network - * \brief Represents an event on the client list (client added, client removed, ...) - */ -struct ClientListEvent +struct ClientContext { - enum Events : uint8_t - { - CLEVT_DELCLIENT = 0, - CLEVT_NEWCLIENT - }; - - Events _event; - Identity _id; + PacketDefragmentation _defragmentation; + PacketCache _cache; + PacketReorderer _reorderer; + CommandQueue _commands; }; /** @@ -66,14 +57,34 @@ class FGE_API ClientList {} ClientSharedPtr _client; - PacketDefragmentation _defragmentation; - CommandQueue _commands; + ClientContext _context; + }; + + /** + * \struct Event + * \ingroup network + * \brief Represents an event on the client list (client added, client removed, ...) + */ + struct Event + { + enum class Types : uint8_t + { + EVT_DELCLIENT = 0, + EVT_NEWCLIENT + }; + using Types_t = std::underlying_type_t; + + inline Event(Types eventType, Identity const& clientId) : + _event(eventType), + _id(clientId) + {} - std::future _mtuFuture; + Types _event; + Identity _id; }; using DataList = std::unordered_map; - using EventList = std::deque; + using EventList = std::deque; ClientList() = default; ~ClientList() = default; @@ -100,6 +111,7 @@ class FGE_API ClientList */ void sendToAll(TransmitPacketPtr const& pck) const; + bool moveTo(ClientList& targetList, Identity const& id); /** * \brief Add a client to the list * @@ -190,13 +202,13 @@ class FGE_API ClientList * * \param evt A client event */ - void pushClientEvent(ClientListEvent const& evt); + void pushClientEvent(Event const& evt); /** * \brief Get the client event with its index * * \return The client event */ - ClientListEvent const& getClientEvent(std::size_t index) const; + Event const& getClientEvent(std::size_t index) const; /** * \brief Get the number of client events * diff --git a/includes/FastEngine/network/C_netCommand.hpp b/includes/FastEngine/network/C_netCommand.hpp index 7cf2538a..b4a78725 100644 --- a/includes/FastEngine/network/C_netCommand.hpp +++ b/includes/FastEngine/network/C_netCommand.hpp @@ -66,7 +66,8 @@ enum class NetCommandTypes { DISCOVER_MTU, CONNECT, - DISCONNECT + DISCONNECT, + CONNECT_HANDLER }; enum class NetCommandResults { @@ -91,35 +92,41 @@ class NetCommand IpAddress::Types addressType, Client& client, std::chrono::milliseconds deltaTime); - [[nodiscard]] virtual NetCommandResults - onReceive(std::unique_ptr& packet, IpAddress::Types addressType, Client& client) = 0; + virtual void onReceive(std::unique_ptr& packet, IpAddress::Types addressType, Client& client) = 0; [[nodiscard]] virtual std::chrono::milliseconds getTimeoutTarget() const; protected: - [[nodiscard]] virtual NetCommandResults - internalUpdate(TransmitPacketPtr& buffPacket, IpAddress::Types addressType, Client& client) = 0; + virtual void internalUpdate(TransmitPacketPtr& buffPacket, + IpAddress::Types addressType, + Client& client, + std::chrono::milliseconds deltaTime) = 0; [[nodiscard]] virtual NetCommandResults timeout(Client& client); void resetTimeout(); + void markAsFailed(); + void markAsSucceeded(); + CommandQueue* _g_commandQueue{nullptr}; private: std::chrono::milliseconds g_timeout{0}; + NetCommandResults g_currentResultState{NetCommandResults::WORKING}; }; -class FGE_API NetMTUCommand : public NetCommand +class FGE_API NetMTUCommand final : public NetCommand { public: using NetCommand::NetCommand; - ~NetMTUCommand() final = default; + ~NetMTUCommand() override = default; [[nodiscard]] NetCommandTypes getType() const override { return NetCommandTypes::DISCOVER_MTU; } - [[nodiscard]] NetCommandResults - internalUpdate(TransmitPacketPtr& buffPacket, IpAddress::Types addressType, Client& client) override; - [[nodiscard]] NetCommandResults - onReceive(std::unique_ptr& packet, IpAddress::Types addressType, Client& client) override; + void internalUpdate(TransmitPacketPtr& buffPacket, + IpAddress::Types addressType, + Client& client, + std::chrono::milliseconds deltaTime) override; + void onReceive(std::unique_ptr& packet, IpAddress::Types addressType, Client& client) override; [[nodiscard]] inline std::future get_future() { return this->g_promise.get_future(); } @@ -144,21 +151,22 @@ class FGE_API NetMTUCommand : public NetCommand } g_state{States::ASKING}; }; -class FGE_API NetConnectCommand : public NetCommand +class FGE_API NetConnectCommand final : public NetCommand { public: using NetCommand::NetCommand; - ~NetConnectCommand() final = default; + ~NetConnectCommand() override = default; void setVersioningString(std::string_view versioningString); [[nodiscard]] std::string const& getVersioningString() const; [[nodiscard]] NetCommandTypes getType() const override { return NetCommandTypes::CONNECT; } - [[nodiscard]] NetCommandResults - internalUpdate(TransmitPacketPtr& buffPacket, IpAddress::Types addressType, Client& client) override; - [[nodiscard]] NetCommandResults - onReceive(std::unique_ptr& packet, IpAddress::Types addressType, Client& client) override; + void internalUpdate(TransmitPacketPtr& buffPacket, + IpAddress::Types addressType, + Client& client, + std::chrono::milliseconds deltaTime) override; + void onReceive(std::unique_ptr& packet, IpAddress::Types addressType, Client& client) override; [[nodiscard]] inline std::future get_future() { return this->g_promise.get_future(); } @@ -189,18 +197,61 @@ class FGE_API NetConnectCommand : public NetCommand std::string g_versioningString; }; -class FGE_API NetDisconnectCommand : public NetCommand +class FGE_API NetConnectHandlerCommand final : public NetCommand { public: using NetCommand::NetCommand; - ~NetDisconnectCommand() final = default; + ~NetConnectHandlerCommand() override = default; - [[nodiscard]] NetCommandTypes getType() const override { return NetCommandTypes::CONNECT; } + [[nodiscard]] NetCommandTypes getType() const override { return NetCommandTypes::CONNECT_HANDLER; } + + void internalUpdate(TransmitPacketPtr& buffPacket, + IpAddress::Types addressType, + Client& client, + std::chrono::milliseconds deltaTime) override; + void onReceive(std::unique_ptr& packet, IpAddress::Types addressType, Client& client) override; + + [[nodiscard]] inline std::future get_future() { return this->g_promise.get_future(); } + + [[nodiscard]] inline std::chrono::milliseconds getTimeoutTarget() const override + { + return FGE_NET_CONNECT_TIMEOUT_MS; + } + +private: + [[nodiscard]] NetCommandResults timeout(Client& client) override; + + std::promise g_promise; + enum class States + { + LOOKUP_MTU, + + DEALING_WITH_MTU, + WAITING_CLIENT_FINAL_MTU, + + CRYPT_HANDSHAKE, + CRYPT_WAITING, + + CONNECTED + } g_state{States::LOOKUP_MTU}; + + std::future g_mtuFuture; + NetMTUCommand g_mtuCommand{this->_g_commandQueue}; +}; + +class FGE_API NetDisconnectCommand final : public NetCommand +{ +public: + using NetCommand::NetCommand; + ~NetDisconnectCommand() override = default; + + [[nodiscard]] NetCommandTypes getType() const override { return NetCommandTypes::DISCONNECT; } - [[nodiscard]] NetCommandResults - internalUpdate(TransmitPacketPtr& buffPacket, IpAddress::Types addressType, Client& client) override; - [[nodiscard]] NetCommandResults - onReceive(std::unique_ptr& packet, IpAddress::Types addressType, Client& client) override; + void internalUpdate(TransmitPacketPtr& buffPacket, + IpAddress::Types addressType, + Client& client, + std::chrono::milliseconds deltaTime) override; + void onReceive(std::unique_ptr& packet, IpAddress::Types addressType, Client& client) override; [[nodiscard]] inline std::future get_future() { return this->g_promise.get_future(); } diff --git a/includes/FastEngine/network/C_protocol.hpp b/includes/FastEngine/network/C_protocol.hpp index ce8535fc..756e6ece 100644 --- a/includes/FastEngine/network/C_protocol.hpp +++ b/includes/FastEngine/network/C_protocol.hpp @@ -287,8 +287,13 @@ class PacketDefragmentation { public: PacketDefragmentation() = default; + PacketDefragmentation(PacketDefragmentation const& r) = delete; + PacketDefragmentation(PacketDefragmentation&& r) noexcept = default; ~PacketDefragmentation() = default; + PacketDefragmentation& operator=(PacketDefragmentation const& r) = delete; + PacketDefragmentation& operator=(PacketDefragmentation&& r) noexcept = default; + enum class Results { RETRIEVABLE, @@ -420,18 +425,31 @@ class FGE_API PacketCache { return this->_counter == r._counter && this->_realm == r._realm; } + + struct Hash + { + [[nodiscard]] inline std::size_t operator()(Label const& label) const + { + static_assert(sizeof(label._counter) == sizeof(label._realm) && sizeof(label._counter) == 2, + "ProtocolPacket::CounterType and ProtocolPacket::RealmType must be 16 bits"); + return std::hash()(static_cast(label._counter) << 16 | + static_cast(label._realm)); + } + }; }; PacketCache() = default; PacketCache(PacketCache const& r) = delete; - PacketCache(PacketCache&& r) noexcept = default; + PacketCache(PacketCache&& r) noexcept; ~PacketCache() = default; PacketCache& operator=(PacketCache const& r) = delete; - PacketCache& operator=(PacketCache&& r) noexcept = default; + PacketCache& operator=(PacketCache&& r) noexcept; void clear(); [[nodiscard]] bool isEmpty() const; + [[nodiscard]] bool isEnabled() const; + void enable(bool enable); //Transmit void push(TransmitPacketPtr const& packet); @@ -457,10 +475,14 @@ class FGE_API PacketCache std::chrono::steady_clock::time_point _time{}; }; + mutable std::mutex g_mutex; + //Circular buffer std::vector g_cache{FGE_NET_PACKET_CACHE_MAX}; std::size_t g_start{0}; std::size_t g_end{0}; + + bool g_enable{false}; }; } // namespace fge::net diff --git a/includes/FastEngine/network/C_server.hpp b/includes/FastEngine/network/C_server.hpp index f784492d..a702739e 100644 --- a/includes/FastEngine/network/C_server.hpp +++ b/includes/FastEngine/network/C_server.hpp @@ -43,6 +43,7 @@ } #define FGE_SERVER_PACKET_RECEPTION_TIMEOUT_MS 250 +#define FGE_SERVER_CLIENTS_MAP_GC_DELAY_MS 5000 namespace fge { @@ -73,6 +74,28 @@ enum class ReturnEvents REVT_CUSTOM }; +class FGE_API ReturnPacketHandler +{ +public: + ReturnPacketHandler() = default; + virtual ~ReturnPacketHandler() = default; + + [[nodiscard]] std::optional + handleReturnPacket(ClientSharedPtr const& refClient, ClientContext& clientContext, ReceivedPacketPtr& packet) const; + + mutable CallbackHandler _onClientReturnPacket; + mutable CallbackHandler _onClientReturnEvent; + mutable CallbackHandler _onClientSimpleReturnEvent; + mutable CallbackHandler + _onClientObjectReturnEvent; + mutable CallbackHandler _onClientAskFullUpdate; +}; + /** * \class NetFluxUdp * \ingroup network @@ -87,7 +110,7 @@ enum class ReturnEvents class FGE_API NetFluxUdp { public: - NetFluxUdp() = default; + NetFluxUdp(bool defaultFlux); NetFluxUdp(NetFluxUdp const& r) = delete; NetFluxUdp(NetFluxUdp&& r) noexcept = delete; virtual ~NetFluxUdp(); @@ -104,13 +127,16 @@ class FGE_API NetFluxUdp void setMaxPackets(std::size_t n); [[nodiscard]] std::size_t getMaxPackets() const; + [[nodiscard]] bool isDefaultFlux() const; + protected: bool pushPacket(ReceivedPacketPtr&& fluxPck); void forcePushPacket(ReceivedPacketPtr fluxPck); void forcePushPacketFront(ReceivedPacketPtr fluxPck); - [[nodiscard]] FluxProcessResults processReorder(Client& client, + [[nodiscard]] FluxProcessResults processReorder(PacketReorderer& reorderer, ReceivedPacketPtr& packet, ProtocolPacket::CounterType currentCounter, + ProtocolPacket::RealmType clientRealm, bool ignoreRealm); mutable std::mutex _g_mutexFlux; @@ -119,22 +145,19 @@ class FGE_API NetFluxUdp private: std::size_t g_maxPackets = FGE_SERVER_DEFAULT_MAXPACKET; + bool g_isDefaultFlux{false}; friend class ServerSideNetUdp; }; -class FGE_API ServerNetFluxUdp : public NetFluxUdp +class FGE_API ServerNetFluxUdp : public NetFluxUdp, public ReturnPacketHandler { public: - explicit ServerNetFluxUdp(ServerSideNetUdp& server) : - NetFluxUdp(), - g_server(&server) - {} + ServerNetFluxUdp(ServerSideNetUdp& server, bool defaultFlux); ~ServerNetFluxUdp() override = default; void processClients(); - [[nodiscard]] FluxProcessResults - process(ClientSharedPtr& refClient, ReceivedPacketPtr& packet, bool allowUnknownClient); + [[nodiscard]] FluxProcessResults process(ClientSharedPtr& refClient, ReceivedPacketPtr& packet); void disconnectAllClients(std::chrono::milliseconds delay = std::chrono::milliseconds(0)) const; @@ -144,30 +167,16 @@ class FGE_API ServerNetFluxUdp : public NetFluxUdp CallbackHandler _onClientTimeout; CallbackHandler _onClientAcknowledged; - CallbackHandler _onClientMTUDiscovered; CallbackHandler _onClientConnected; CallbackHandler _onClientDisconnected; CallbackHandler _onClientDropped; - CallbackHandler _onClientReturnPacket; - CallbackHandler _onClientReturnEvent; - CallbackHandler _onClientSimpleReturnEvent; - CallbackHandler - _onClientObjectReturnEvent; - CallbackHandler _onClientAskFullUpdate; - private: [[nodiscard]] bool verifyRealm(ClientSharedPtr const& refClient, ReceivedPacketPtr const& packet); - [[nodiscard]] NetCommandResults - checkCommands(ClientSharedPtr const& refClient, CommandQueue& commands, ReceivedPacketPtr& packet); [[nodiscard]] FluxProcessResults processUnknownClient(ClientSharedPtr& refClient, ReceivedPacketPtr& packet); - [[nodiscard]] FluxProcessResults processAcknowledgedClient(ClientList::Data& refClientData, - ReceivedPacketPtr& packet); - [[nodiscard]] FluxProcessResults processMTUDiscoveredClient(ClientList::Data& refClientData, - ReceivedPacketPtr& packet); - ServerSideNetUdp* g_server{nullptr}; + ServerSideNetUdp* const g_server{nullptr}; std::chrono::milliseconds g_commandsUpdateTick{0}; std::chrono::steady_clock::time_point g_lastCommandUpdateTimePoint{std::chrono::steady_clock::now()}; }; @@ -243,7 +252,7 @@ class FGE_API ServerSideNetUdp void notifyTransmission(); [[nodiscard]] bool isRunning() const; - void notifyNewClient(Identity const& identity, ClientSharedPtr const& client); + [[nodiscard]] bool announceNewClient(Identity const& identity, ClientSharedPtr const& client); void sendTo(TransmitPacketPtr& pck, Client const& client, Identity const& id); void sendTo(TransmitPacketPtr& pck, Identity const& id); @@ -312,6 +321,8 @@ class FGE_API ClientSideNetUdp : public NetFluxUdp [[nodiscard]] std::size_t waitForPackets(std::chrono::milliseconds time_ms); [[nodiscard]] Identity const& getClientIdentity() const; + [[nodiscard]] ClientContext const& getClientContext() const; + [[nodiscard]] ClientContext& getClientContext(); template void sendTo(TransmitPacketPtr& pck, Identity const& id); @@ -324,12 +335,13 @@ class FGE_API ClientSideNetUdp : public NetFluxUdp void endReturnEvent(); void simpleReturnEvent(uint16_t id); - void askFullUpdateReturnEvent(); + bool askFullUpdateReturnEvent(); void enableReturnPacket(bool enable); [[nodiscard]] bool isReturnPacketEnabled() const; [[nodiscard]] TransmitPacketPtr prepareAndRetrieveReturnPacket(); + [[nodiscard]] std::optional loopbackReturnPacket(ReturnPacketHandler const& handler); Client _client; //But it is the server :O @@ -342,9 +354,6 @@ class FGE_API ClientSideNetUdp : public NetFluxUdp void threadReception(); void threadTransmission(); - std::recursive_mutex g_mutexCommands; - CommandQueue g_commands; - std::unique_ptr g_threadReception; std::unique_ptr g_threadTransmission; @@ -356,7 +365,8 @@ class FGE_API ClientSideNetUdp : public NetFluxUdp Identity g_clientIdentity; - PacketDefragmentation g_defragmentation; + std::recursive_mutex g_mutexCommands; + ClientContext g_clientContext; bool g_returnPacketEnabled{false}; TransmitPacketPtr g_returnPacket; diff --git a/includes/private/fge_crypt.hpp b/includes/private/fge_crypt.hpp index a36aa594..56ad1de3 100644 --- a/includes/private/fge_crypt.hpp +++ b/includes/private/fge_crypt.hpp @@ -18,7 +18,13 @@ #define _FGE_FGE_CRYPT_HPP_INCLUDED #include "FastEngine/network/C_client.hpp" -#include "FastEngine/network/C_packet.hpp" + +namespace fge::net +{ + +class Packet; + +} // namespace fge::net namespace fge::priv { @@ -27,9 +33,9 @@ namespace fge::priv [[nodiscard]] bool CryptServerInit(void*& ctx); void CryptUninit(void*& ctx); -[[nodiscard]] bool CryptClientCreate(void* ctx, net::Client& client); -[[nodiscard]] bool CryptServerCreate(void* ctx, net::Client& client); -void CryptClientDestroy(net::Client& client); +[[nodiscard]] bool CryptClientCreate(void* ctx, net::CryptInfo& client); +[[nodiscard]] bool CryptServerCreate(void* ctx, net::CryptInfo& client); +void CryptClientDestroy(net::CryptInfo& client); [[nodiscard]] bool CryptEncrypt(net::Client& client, net::Packet& packet); [[nodiscard]] bool CryptDecrypt(net::Client& client, net::Packet& packet); diff --git a/sources/C_scene.cpp b/sources/C_scene.cpp index 5ae52bb5..26b92fc6 100644 --- a/sources/C_scene.cpp +++ b/sources/C_scene.cpp @@ -1579,7 +1579,7 @@ void Scene::clientsCheckup(fge::net::ClientList const& clients, bool force) for (std::size_t i = 0; i < clients.getClientEventSize(); ++i) { auto const& evt = clients.getClientEvent(i); - if (evt._event == fge::net::ClientListEvent::CLEVT_DELCLIENT) + if (evt._event == fge::net::ClientList::Event::Types::EVT_DELCLIENT) { this->g_perClientSyncs.erase(evt._id); } diff --git a/sources/network/C_client.cpp b/sources/network/C_client.cpp index a37812be..150eaed6 100644 --- a/sources/network/C_client.cpp +++ b/sources/network/C_client.cpp @@ -102,6 +102,13 @@ bool ClientStatus::isTimeout() const return false; } +//CryptInfo + +CryptInfo::~CryptInfo() +{ + priv::CryptClientDestroy(*this); +} + //Client Client::Client() : @@ -111,10 +118,7 @@ Client::Client() : g_lastPacketTimePoint(std::chrono::steady_clock::now()), g_lastRealmChangeTimePoint(std::chrono::steady_clock::now()) {} -Client::~Client() -{ - priv::CryptClientDestroy(*this); -} +Client::~Client() = default; Client::Client(Latency_ms CTOSLatency, Latency_ms STOCLatency) : g_correctorTimestamp(std::nullopt), g_CTOSLatency_ms(CTOSLatency), @@ -364,29 +368,12 @@ ProtocolPacket::CounterType Client::getLastReorderedPacketCounter() const return this->g_lastReorderedPacketCounter; } -PacketReorderer& Client::getPacketReorderer() -{ - return this->g_packetReorderer; -} -PacketReorderer const& Client::getPacketReorderer() const -{ - return this->g_packetReorderer; -} - -DataLockPair Client::getPacketCache() -{ - return DataLockPair(&this->g_packetCache, this->g_mutex); -} -DataLockPair Client::getPacketCache() const -{ - return DataLockPair(&this->g_packetCache, this->g_mutex); -} void Client::acknowledgeReception(ReceivedPacketPtr const& packet) { std::scoped_lock const lck(this->g_mutex); - this->g_acknowledgedPackets.push_back({packet->retrieveCounter().value(), packet->retrieveRealm().value()}); + this->g_acknowledgedPackets.emplace(packet->retrieveCounter().value(), packet->retrieveRealm().value()); } -std::vector const& Client::getAcknowledgedList() const +std::unordered_set const& Client::getAcknowledgedList() const { return this->g_acknowledgedPackets; } @@ -440,11 +427,11 @@ ClientStatus& Client::getStatus() return this->g_status; } -Client::CryptInfo const& Client::getCryptInfo() const +CryptInfo const& Client::getCryptInfo() const { return this->g_cryptInfo; } -Client::CryptInfo& Client::getCryptInfo() +CryptInfo& Client::getCryptInfo() { return this->g_cryptInfo; } diff --git a/sources/network/C_clientList.cpp b/sources/network/C_clientList.cpp index 9cc4cd40..24497861 100644 --- a/sources/network/C_clientList.cpp +++ b/sources/network/C_clientList.cpp @@ -45,13 +45,34 @@ void ClientList::sendToAll(TransmitPacketPtr const& pck) const } } +bool ClientList::moveTo(ClientList& targetList, Identity const& id) +{ + std::scoped_lock const lck(this->g_mutex); + auto it = this->g_data.find(id); + if (it == this->g_data.end()) + { + return false; + } + + std::scoped_lock const lckTarget(targetList.g_mutex); + auto itTarget = targetList.g_data.find(id); + if (itTarget != targetList.g_data.end()) + { + return false; + } + + targetList.g_data.emplace(id, std::move(it->second)); + this->g_data.erase(it); + return true; +} + void ClientList::add(Identity const& id, ClientSharedPtr const& newClient) { std::scoped_lock const lck(this->g_mutex); this->g_data.emplace(id, newClient); if (this->g_enableClientEventsFlag) { - this->g_events.push_back({ClientListEvent::CLEVT_NEWCLIENT, id}); + this->g_events.emplace_back(Event::Types::EVT_NEWCLIENT, id); } } void ClientList::remove(Identity const& id) @@ -60,7 +81,7 @@ void ClientList::remove(Identity const& id) this->g_data.erase(id); if (this->g_enableClientEventsFlag) { - this->g_events.push_back({ClientListEvent::CLEVT_DELCLIENT, id}); + this->g_events.emplace_back(Event::Types::EVT_DELCLIENT, id); } } ClientList::DataList::iterator ClientList::remove(DataList::const_iterator itPos, @@ -69,7 +90,7 @@ ClientList::DataList::iterator ClientList::remove(DataList::const_iterator itPos lock.throwIfDifferent(this->g_mutex); if (this->g_enableClientEventsFlag) { - this->g_events.push_back({ClientListEvent::CLEVT_DELCLIENT, itPos->first}); + this->g_events.emplace_back(Event::Types::EVT_DELCLIENT, itPos->first); } return this->g_data.erase(itPos); } @@ -147,13 +168,13 @@ bool ClientList::isWatchingEvent() const return this->g_enableClientEventsFlag; } -void ClientList::pushClientEvent(ClientListEvent const& evt) +void ClientList::pushClientEvent(Event const& evt) { std::scoped_lock const lck(this->g_mutex); this->g_events.push_back(evt); } -ClientListEvent const& ClientList::getClientEvent(std::size_t index) const +ClientList::Event const& ClientList::getClientEvent(std::size_t index) const { std::scoped_lock const lck(this->g_mutex); return this->g_events[index]; diff --git a/sources/network/C_netClient.cpp b/sources/network/C_netClient.cpp new file mode 100644 index 00000000..f22b09fb --- /dev/null +++ b/sources/network/C_netClient.cpp @@ -0,0 +1,706 @@ +/* + * Copyright 2025 Guillaume Guillet + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "FastEngine/manager/network_manager.hpp" +#include "FastEngine/network/C_server.hpp" +#include "private/fge_crypt.hpp" +#include "private/fge_debug.hpp" +#include +#include + +using namespace fge::priv; + +namespace fge::net +{ + +//ClientSideNetUdp +ClientSideNetUdp::ClientSideNetUdp(IpAddress::Types addressType) : + NetFluxUdp(false), + g_socket(addressType) +{ + this->_client.getStatus().setNetworkStatus(ClientStatus::NetworkStatus::DISCONNECTED); + this->resetReturnPacket(); +} + +ClientSideNetUdp::~ClientSideNetUdp() +{ + this->stop(); +} + +bool ClientSideNetUdp::start(Port bindPort, + IpAddress const& bindIp, + Port connectRemotePort, + IpAddress const& connectRemoteAddress, + IpAddress::Types addressType) +{ + if (this->g_running) + { + return false; + } + + this->resetReturnPacket(); + + this->g_socket.setAddressType(addressType); + if (addressType == IpAddress::Types::Ipv6) + { + this->g_socket.setIpv6Only(false); + } + else + { + this->g_socket.setDontFragment(true); + } + + this->_client.getStatus().setNetworkStatus(ClientStatus::NetworkStatus::DISCONNECTED); + + if (this->g_socket.bind(bindPort, bindIp) == Socket::Errors::ERR_NOERROR) + { + if (this->g_socket.connect(connectRemoteAddress, connectRemotePort) != Socket::Errors::ERR_NOERROR) + { + this->g_socket.close(); + return false; + } + + if (!CryptClientInit(this->g_crypt_ctx)) + { + this->g_socket.close(); + return false; + } + if (!CryptClientCreate(this->g_crypt_ctx, this->_client.getCryptInfo())) + { + this->g_socket.close(); + return false; + } + + this->g_clientIdentity._ip = connectRemoteAddress; + this->g_clientIdentity._port = connectRemotePort; + + this->g_running = true; + + this->g_threadReception = std::make_unique(&ClientSideNetUdp::threadReception, this); + this->g_threadTransmission = std::make_unique(&ClientSideNetUdp::threadTransmission, this); + + return true; + } + this->g_socket.close(); + return false; +} +void ClientSideNetUdp::stop() +{ + if (this->g_running) + { + this->disconnect().wait(); + + this->g_running = false; + + this->g_threadReception->join(); + this->g_threadTransmission->join(); + + this->g_threadReception = nullptr; + this->g_threadTransmission = nullptr; + + this->g_socket.close(); + + //Clear the flux + this->clearPackets(); + //Clear client + this->_client.clearPackets(); + this->_client.clearLostPacketCount(); + this->_client.setClientPacketCounter(0); + this->_client.setCurrentRealm(FGE_NET_DEFAULT_REALM); + this->_client.setCurrentPacketCounter(0); + + CryptClientDestroy(this->_client.getCryptInfo()); + CryptUninit(this->g_crypt_ctx); + } +} + +void ClientSideNetUdp::notifyTransmission() +{ + this->g_transmissionNotifier.notify_one(); +} + +IpAddress::Types ClientSideNetUdp::getAddressType() const +{ + return this->g_socket.getAddressType(); +} + +bool ClientSideNetUdp::isRunning() const +{ + return this->g_running; +} + +std::future ClientSideNetUdp::retrieveMTU() +{ + if (!this->g_running) + { + throw Exception("Cannot retrieve MTU without a running client"); + } + + auto command = std::make_unique(&this->g_clientContext._commands); + auto future = command->get_future(); + + std::scoped_lock const lock(this->g_mutexCommands); + this->g_clientContext._commands.push_back(std::move(command)); + + return future; +} +std::future ClientSideNetUdp::connect(std::string_view versioningString) +{ + if (!this->g_running) + { + throw Exception("Cannot connect without a running client"); + } + + auto command = std::make_unique(&this->g_clientContext._commands); + auto future = command->get_future(); + command->setVersioningString(versioningString); + + std::scoped_lock const lock(this->g_mutexCommands); + this->g_clientContext._commands.push_back(std::move(command)); + + return future; +} +std::future ClientSideNetUdp::disconnect() +{ + if (!this->g_running) + { + std::promise promise; + auto future = promise.get_future(); + promise.set_value(); + return future; + } + + this->enableReturnPacket(false); + + auto command = std::make_unique(&this->g_clientContext._commands); + auto future = command->get_future(); + + std::scoped_lock const lock(this->g_mutexCommands); + this->g_clientContext._commands.push_back(std::move(command)); + + return future; +} + +Identity const& ClientSideNetUdp::getClientIdentity() const +{ + return this->g_clientIdentity; +} +ClientContext const& ClientSideNetUdp::getClientContext() const +{ + return this->g_clientContext; +} +ClientContext& ClientSideNetUdp::getClientContext() +{ + return this->g_clientContext; +} + +FluxProcessResults ClientSideNetUdp::process(ReceivedPacketPtr& packet) +{ + ///TODO: no lock ? + packet.reset(); + + if (this->_client.getStatus().isDisconnected()) + { + this->_g_remainingPackets = 0; + return FluxProcessResults::NONE_AVAILABLE; + } + + //Checking timeout + if (this->_client.getStatus().isTimeout()) + { + this->_client.getStatus().setNetworkStatus(ClientStatus::NetworkStatus::TIMEOUT); + this->_g_remainingPackets = 0; + this->clearPackets(); + this->_onClientTimeout.call(*this); + return FluxProcessResults::NONE_AVAILABLE; + } + + //Checking return packet + if (this->isReturnPacketEnabled()) + { + auto const now = std::chrono::steady_clock::now(); + if (now - this->g_returnPacketTimePoint >= this->_client.getPacketReturnRate()) + { + this->g_returnPacketTimePoint = now; + auto returnPacket = this->prepareAndRetrieveReturnPacket(); + this->_onTransmitReturnPacket.call(*this, returnPacket); + this->_client.pushPacket(std::move(returnPacket)); + } + } + + if (this->_g_remainingPackets == 0) + { + this->_g_remainingPackets = this->getPacketsSize(); + return FluxProcessResults::NONE_AVAILABLE; + } + + //Popping the next packet + packet = this->popNextPacket(); + if (!packet) + { + this->_g_remainingPackets = this->getPacketsSize(); + return FluxProcessResults::NONE_AVAILABLE; + } + --this->_g_remainingPackets; + + if (!packet->isMarkedAsLocallyReordered()) + { + this->_client.acknowledgeReception(packet); + } + + auto const stat = PacketReorderer::checkStat(packet, this->_client.getCurrentPacketCounter(), + this->_client.getCurrentRealm()); + + if (!packet->checkFlags(FGE_NET_HEADER_DO_NOT_DISCARD_FLAG)) + { + if (stat == PacketReorderer::Stats::OLD_REALM || stat == PacketReorderer::Stats::OLD_COUNTER) + { +#ifdef FGE_DEF_DEBUG + auto const packetCounter = packet->retrieveCounter().value(); + auto const packetRealm = packet->retrieveRealm().value(); + auto const currentCounter = this->_client.getCurrentPacketCounter(); +#endif + FGE_DEBUG_PRINT("Packet is old, discarding it packetCounter: {}, packetRealm: {}, currentCounter: {}", + packetCounter, packetRealm, currentCounter); + this->_client.advanceLostPacketCount(); + return FluxProcessResults::INTERNALLY_DISCARDED; + } + } + + bool const doNotReorder = packet->checkFlags(FGE_NET_HEADER_DO_NOT_REORDER_FLAG); + if (!doNotReorder && !packet->isMarkedAsLocallyReordered()) + { + auto reorderResult = + this->processReorder(this->g_clientContext._reorderer, packet, this->_client.getCurrentPacketCounter(), + this->_client.getCurrentRealm(), false); + if (reorderResult != FluxProcessResults::USER_RETRIEVABLE) + { + return reorderResult; + } + } + + if (packet->retrieveHeaderId().value() == NET_INTERNAL_ID_DISCONNECT) + { + this->_client.getStatus().setNetworkStatus(ClientStatus::NetworkStatus::DISCONNECTED); + this->_g_remainingPackets = 0; + this->clearPackets(); + this->_onClientDisconnected.call(*this); + return FluxProcessResults::NONE_AVAILABLE; + } + + if (stat == PacketReorderer::Stats::WAITING_NEXT_REALM || stat == PacketReorderer::Stats::WAITING_NEXT_COUNTER) + { +#ifdef FGE_DEF_DEBUG + auto const packetCounter = packet->retrieveCounter().value(); + auto const packetRealm = packet->retrieveRealm().value(); + auto const currentCounter = this->_client.getCurrentPacketCounter(); +#endif + FGE_DEBUG_PRINT("We lose a packet packetCounter: {}, packetRealm: {}, currentCounter: {}", packetCounter, + packetRealm, currentCounter); + this->_client.advanceLostPacketCount(); //We are missing a packet + } + + if (!doNotReorder) + { + auto const serverCountId = packet->retrieveCounter().value(); + this->_client.setCurrentPacketCounter(serverCountId); + } + auto const serverRealm = packet->retrieveRealm().value(); + this->_client.setCurrentRealm(serverRealm); + + return FluxProcessResults::USER_RETRIEVABLE; +} + +void ClientSideNetUdp::resetReturnPacket() +{ + this->g_returnPacket = CreatePacket(NET_INTERNAL_ID_RETURN_PACKET); + this->g_returnPacket->append(sizeof(this->g_returnPacketEventCount)); + + this->g_returnPacketEventCount = 0; + this->g_isAskingFullUpdate = false; + this->g_returnPacketEventStarted = false; + this->g_returnPacketTimePoint = std::chrono::steady_clock::now(); +} + +TransmitPacketPtr& ClientSideNetUdp::startReturnEvent(ReturnEvents event) +{ + if (this->g_returnPacketEventStarted) + { + throw Exception("Cannot start a new return event without ending the previous one"); + } + this->g_returnPacketEventStarted = true; + ++this->g_returnPacketEventCount; + *this->g_returnPacket << event; + this->g_returnPacketStartPosition = this->g_returnPacket->getDataSize(); + this->g_returnPacket->append(sizeof(uint16_t)); + return this->g_returnPacket; +} + +TransmitPacketPtr& +ClientSideNetUdp::startObjectReturnEvent(uint16_t commandIndex, ObjectSid parentSid, ObjectSid targetSid) +{ + this->startReturnEvent(ReturnEvents::REVT_OBJECT); + *this->g_returnPacket << commandIndex << parentSid << targetSid; + return this->g_returnPacket; +} + +void ClientSideNetUdp::endReturnEvent() +{ + if (!this->g_returnPacketEventStarted) + { + throw Exception("Cannot end a return event without starting one"); + } + this->g_returnPacketEventStarted = false; + + //Rewrite event data size + uint16_t eventSize = this->g_returnPacket->getDataSize() - this->g_returnPacketStartPosition - sizeof(uint16_t); + this->g_returnPacket->pack(this->g_returnPacketStartPosition, &eventSize, sizeof(uint16_t)); +} + +void ClientSideNetUdp::simpleReturnEvent(uint16_t id) +{ + this->startReturnEvent(ReturnEvents::REVT_SIMPLE); + *this->g_returnPacket << id; + this->endReturnEvent(); +} + +bool ClientSideNetUdp::askFullUpdateReturnEvent() +{ + if (this->g_isAskingFullUpdate) + { + return false; + } + this->g_isAskingFullUpdate = true; + this->startReturnEvent(ReturnEvents::REVT_ASK_FULL_UPDATE); + this->endReturnEvent(); + return true; +} + +void ClientSideNetUdp::enableReturnPacket(bool enable) +{ + this->g_returnPacketEnabled = enable; + if (enable) + { + this->resetReturnPacket(); + } +} +bool ClientSideNetUdp::isReturnPacketEnabled() const +{ + return this->g_returnPacketEnabled; +} + +TransmitPacketPtr ClientSideNetUdp::prepareAndRetrieveReturnPacket() +{ + if (this->g_returnPacketEventStarted) + { + throw Exception("Cannot retrieve a return packet without ending the current command"); + } + + auto returnPacket = std::move(this->g_returnPacket); + + //Rewrite events count + returnPacket->packet().pack(ProtocolPacket::HeaderSize, &this->g_returnPacketEventCount, + sizeof(this->g_returnPacketEventCount)); + + //After return events, the packet is mostly composed of timestamp and latency information to limit bandwidth of packets. + //The LatencyPlanner class will do all the work for that. + this->_client._latencyPlanner.pack(returnPacket); + + //Pack acknowledged packets + auto const& acknowledgedPackets = this->_client.getAcknowledgedList(); + SizeType const size = acknowledgedPackets.size(); + returnPacket->packet() << size; + for (auto const& acknowledgedPacket: acknowledgedPackets) + { + returnPacket->packet() << acknowledgedPacket._counter << acknowledgedPacket._realm; + } + this->_client.clearAcknowledgedList(); + + //Prepare the new returnPacket + this->resetReturnPacket(); + + return returnPacket; +} +std::optional ClientSideNetUdp::loopbackReturnPacket(ReturnPacketHandler const& handler) +{ + TransmitPacketPtr transmitReturnPacket = this->prepareAndRetrieveReturnPacket(); + ReceivedPacketPtr returnPacket = + std::make_unique(std::move(*transmitReturnPacket), this->g_clientIdentity); + returnPacket->setReadPos(ProtocolPacket::HeaderSize); + + ClientSharedPtr const clientPtr = std::shared_ptr(&this->_client, [](Client*) {}); + + return handler.handleReturnPacket(clientPtr, this->g_clientContext, returnPacket); +} + +std::size_t ClientSideNetUdp::waitForPackets(std::chrono::milliseconds time_ms) +{ + std::unique_lock lock(this->_g_mutexFlux); + auto packetSize = this->_g_packets.size(); + if (packetSize > 0) + { + return packetSize; + } + + this->g_receptionNotifier.wait_for(lock, time_ms); + return this->_g_packets.size(); +} + +void ClientSideNetUdp::threadReception() +{ + Packet pckReceive; + CompressorLZ4 compressor; + + while (this->g_running) + { + if (this->g_socket.select(true, FGE_SERVER_PACKET_RECEPTION_TIMEOUT_MS) == Socket::Errors::ERR_NOERROR) + { + if (this->g_socket.receive(pckReceive) != Socket::Errors::ERR_NOERROR) + { + continue; + } + +#ifdef FGE_ENABLE_CLIENT_NETWORK_RANDOM_LOST + if (fge::_random.range(0, 1000) <= 10) + { + continue; + } +#endif + + //Decrypting the packet if needed + if (this->_client.getStatus().isInEncryptedState()) + { + if (!CryptDecrypt(this->_client, pckReceive)) + { + FGE_DEBUG_PRINT("CryptDecrypt failed"); + continue; + } + } + + auto packet = std::make_unique(std::move(pckReceive), this->g_clientIdentity); + packet->setTimestamp(Client::getTimestamp_ms()); + + //Here we consider that the packet is not encrypted + if (!packet->haveCorrectHeader()) + { + continue; + } + //Skip the header for reading + packet->skip(ProtocolPacket::HeaderSize); + + //Check if the packet is a fragment + if (packet->isFragmented()) + { + auto const result = this->g_clientContext._defragmentation.process(std::move(packet)); + if (result._result == PacketDefragmentation::Results::RETRIEVABLE) + { + packet = this->g_clientContext._defragmentation.retrieve(result._id, this->g_clientIdentity); + } + else + { + continue; + } + } + + //Decompress the packet if needed + if (!packet->decompress(compressor)) + { + FGE_DEBUG_PRINT("decompress failed"); + continue; + } + + //Check client status and reset timeout + auto const networkStatus = this->_client.getStatus().getNetworkStatus(); + if (networkStatus != ClientStatus::NetworkStatus::TIMEOUT) + { + //TODO : check if we need to reset the timeout + this->_client.getStatus().resetTimeout(); + } + + auto const headerId = packet->retrieveFullHeaderId().value(); + + //MTU handling + if (networkStatus == ClientStatus::NetworkStatus::ACKNOWLEDGED) + { + switch (headerId & ~FGE_NET_HEADER_FLAGS_MASK) + { + case NET_INTERNAL_ID_MTU_TEST: + { + auto response = CreatePacket(NET_INTERNAL_ID_MTU_TEST_RESPONSE); + response->doNotDiscard().doNotReorder(); + this->_client.pushPacket(std::move(response)); + this->_client.getStatus().resetTimeout(); + FGE_DEBUG_PRINT("received MTU test"); + continue; + } + case NET_INTERNAL_ID_MTU_ASK: + { + auto response = CreatePacket(NET_INTERNAL_ID_MTU_ASK_RESPONSE); + response->doNotDiscard().doNotReorder() << this->g_socket.retrieveCurrentAdapterMTU().value_or(0); + this->_client.pushPacket(std::move(response)); + this->_client.getStatus().resetTimeout(); + FGE_DEBUG_PRINT("received MTU ask"); + continue; + } + case NET_INTERNAL_ID_MTU_FINAL: + FGE_DEBUG_PRINT("received MTU final"); + this->_client._mtuFinalizedFlag = true; + this->_client.getStatus().resetTimeout(); + continue; + } + } + + //Checking commands + { + std::scoped_lock const commandLock(this->g_mutexCommands); + if (!this->g_clientContext._commands.empty()) + { + this->g_clientContext._commands.front()->onReceive(packet, this->g_socket.getAddressType(), + this->_client); + + //Commands can drop the packet + if (!packet) + { + continue; + } + } + } + + this->pushPacket(std::move(packet)); + this->g_receptionNotifier.notify_all(); + } + } +} +void ClientSideNetUdp::threadTransmission() +{ + auto lastTimePoint = std::chrono::steady_clock::now(); + std::chrono::milliseconds commandsTime{0}; + CompressorLZ4 compressor; + + std::unique_lock lckServer(this->_g_mutexFlux); + + while (this->g_running) + { + this->g_transmissionNotifier.wait_for(lckServer, std::chrono::milliseconds(10)); + + auto const now = std::chrono::steady_clock::now(); + auto const deltaTime = std::chrono::duration_cast(now - lastTimePoint); + lastTimePoint = now; + + //Checking commands + commandsTime += std::max(std::chrono::milliseconds{1}, deltaTime); + if (commandsTime >= FGE_NET_CMD_UPDATE_TICK_MS) + { + std::scoped_lock const commandLock(this->g_mutexCommands); + if (!this->g_clientContext._commands.empty()) + { + TransmitPacketPtr possiblePacket; + auto const result = this->g_clientContext._commands.front()->update( + possiblePacket, this->g_socket.getAddressType(), this->_client, commandsTime); + if (result == NetCommandResults::SUCCESS || result == NetCommandResults::FAILURE) + { + this->g_clientContext._commands.pop_front(); + } + + if (possiblePacket) + { + //Pushing the packet to the client queue + this->_client.pushPacket(std::move(possiblePacket)); + } + } + + commandsTime = std::chrono::milliseconds::zero(); + } + + //Flux + if (this->_client.isPendingPacketsEmpty()) + { + continue; + } + + if (this->_client.getLastPacketLatency() >= this->_client.getCTOSLatency_ms()) + { //Ready to send ! + auto transmissionPacket = this->_client.popPacket(); + + //Compression and applying options + if (!transmissionPacket->isFragmented() && this->_client.getStatus().isInEncryptedState()) + { + transmissionPacket->applyOptions(this->_client); + if (!transmissionPacket->compress(compressor)) + { + continue; + } + } + else + { + transmissionPacket->applyOptions(this->_client); + } + + //MTU check + if (!transmissionPacket->isFragmented() && + !transmissionPacket->checkFlags(FGE_NET_HEADER_DO_NOT_FRAGMENT_FLAG)) + { + //Packet is not fragmented, we have to check is size + if (this->_client.getMTU() == 0) + { //We don't know the MTU yet + goto mtu_check_skip; + } + + auto fragments = transmissionPacket->fragment(this->_client.getMTU()); + for (std::size_t i = 0; i < fragments.size(); ++i) + { + if (i == 0) + { + transmissionPacket = std::move(fragments[i]); + } + else + { + this->_client.pushForcedFrontPacket(std::move(fragments[i])); + } + } + } + mtu_check_skip: + + if (!transmissionPacket->packet() || !transmissionPacket->haveCorrectHeaderSize()) + { //Last verification of the packet + continue; + } + + //Check if the packet is a disconnection packet + if (transmissionPacket->retrieveHeaderId().value() == NET_INTERNAL_ID_DISCONNECT) + { + this->_client.getStatus().setNetworkStatus(ClientStatus::NetworkStatus::DISCONNECTED); + this->_client.getStatus().setTimeout(FGE_NET_STATUS_DEFAULT_TIMEOUT); + this->_client.clearPackets(); + } + + //Check if the packet must be encrypted + if (transmissionPacket->isMarkedForEncryption()) + { + if (!CryptEncrypt(this->_client, *transmissionPacket)) + { + continue; + } + } + + //Sending the packet + this->g_socket.send(transmissionPacket->packet()); + this->_client.resetLastPacketTimePoint(); + } + } +} + +} // namespace fge::net diff --git a/sources/network/C_netCommand.cpp b/sources/network/C_netCommand.cpp index c0a08183..406dc7e1 100644 --- a/sources/network/C_netCommand.cpp +++ b/sources/network/C_netCommand.cpp @@ -31,6 +31,11 @@ NetCommandResults NetCommand::update(TransmitPacketPtr& buffPacket, Client& client, std::chrono::milliseconds deltaTime) { + if (this->g_currentResultState != NetCommandResults::WORKING) + { + return this->g_currentResultState; + } + this->g_timeout += deltaTime; if (this->g_timeout >= this->getTimeoutTarget()) { @@ -52,7 +57,8 @@ NetCommandResults NetCommand::update(TransmitPacketPtr& buffPacket, } } - return this->internalUpdate(buffPacket, addressType, client); + this->internalUpdate(buffPacket, addressType, client, deltaTime); + return NetCommandResults::WORKING; } std::chrono::milliseconds NetCommand::getTimeoutTarget() const @@ -69,10 +75,26 @@ void NetCommand::resetTimeout() this->g_timeout = std::chrono::milliseconds::zero(); } +void NetCommand::markAsFailed() +{ + if (this->g_currentResultState == NetCommandResults::WORKING) + { + this->g_currentResultState = NetCommandResults::FAILURE; + } +} +void NetCommand::markAsSucceeded() +{ + if (this->g_currentResultState == NetCommandResults::WORKING) + { + this->g_currentResultState = NetCommandResults::SUCCESS; + } +} + //NetMTUCommand -NetCommandResults NetMTUCommand::internalUpdate(TransmitPacketPtr& buffPacket, - IpAddress::Types addressType, - [[maybe_unused]] Client& client) +void NetMTUCommand::internalUpdate(TransmitPacketPtr& buffPacket, + IpAddress::Types addressType, + [[maybe_unused]] Client& client, + [[maybe_unused]] std::chrono::milliseconds deltaTime) { switch (this->g_state) { @@ -117,12 +139,10 @@ NetCommandResults NetMTUCommand::internalUpdate(TransmitPacketPtr& buffPacket, default: break; } - - return NetCommandResults::WORKING; } -NetCommandResults NetMTUCommand::onReceive(std::unique_ptr& packet, - IpAddress::Types addressType, - [[maybe_unused]] Client& client) +void NetMTUCommand::onReceive(std::unique_ptr& packet, + IpAddress::Types addressType, + [[maybe_unused]] Client& client) { switch (this->g_state) { @@ -138,7 +158,8 @@ NetCommandResults NetMTUCommand::onReceive(std::unique_ptr& pack //Invalid packet FGE_DEBUG_PRINT("MTU: Invalid packet"); this->g_promise.set_value(0); - return NetCommandResults::FAILURE; + this->markAsFailed(); + return; } FGE_DEBUG_PRINT("MTU: targetMTU: {}", targetMTU); @@ -163,7 +184,8 @@ NetCommandResults NetMTUCommand::onReceive(std::unique_ptr& pack { FGE_DEBUG_PRINT("MTU: currentMTU == maximumMTU"); this->g_promise.set_value(this->g_currentMTU); - return NetCommandResults::SUCCESS; + this->markAsSucceeded(); + return; } //Compute a new target MTU @@ -192,7 +214,6 @@ NetCommandResults NetMTUCommand::onReceive(std::unique_ptr& pack this->resetTimeout(); this->g_state = States::DISCOVER; - return NetCommandResults::WORKING; } break; case States::DISCOVER: @@ -207,7 +228,15 @@ NetCommandResults NetMTUCommand::onReceive(std::unique_ptr& pack { FGE_DEBUG_PRINT(this->g_currentMTU == 0 ? "MTU: discovery failed" : "MTU: discovery ok"); this->g_promise.set_value(this->g_currentMTU); - return this->g_currentMTU == 0 ? NetCommandResults::FAILURE : NetCommandResults::SUCCESS; + if (this->g_currentMTU == 0) + { + this->markAsFailed(); + } + else + { + this->markAsSucceeded(); + } + return; } this->g_targetMTU += this->g_intervalMTU; @@ -225,8 +254,6 @@ NetCommandResults NetMTUCommand::onReceive(std::unique_ptr& pack default: break; } - - return NetCommandResults::WORKING; } NetCommandResults NetMTUCommand::timeout([[maybe_unused]] Client& client) { @@ -270,9 +297,10 @@ std::string const& NetConnectCommand::getVersioningString() const return this->g_versioningString; } -NetCommandResults NetConnectCommand::internalUpdate(TransmitPacketPtr& buffPacket, - [[maybe_unused]] IpAddress::Types addressType, - Client& client) +void NetConnectCommand::internalUpdate(TransmitPacketPtr& buffPacket, + [[maybe_unused]] IpAddress::Types addressType, + Client& client, + [[maybe_unused]] std::chrono::milliseconds deltaTime) { switch (this->g_state) { @@ -299,7 +327,8 @@ NetCommandResults NetConnectCommand::internalUpdate(TransmitPacketPtr& buffPacke //MTU discovery failed client.getStatus().setNetworkStatus(ClientStatus::NetworkStatus::DISCONNECTED); this->g_promise.set_value(false); - return NetCommandResults::FAILURE; + this->markAsFailed(); + return; } client.setMTU(mtu); FGE_DEBUG_PRINT("MTU discovery ok, now waiting for server to finish"); @@ -325,7 +354,7 @@ NetCommandResults NetConnectCommand::internalUpdate(TransmitPacketPtr& buffPacke { if (!client._mtuFinalizedFlag) { - return NetCommandResults::WORKING; + return; } FGE_DEBUG_PRINT("MTU finalized"); client.getStatus().setNetworkStatus(ClientStatus::NetworkStatus::MTU_DISCOVERED); @@ -347,7 +376,8 @@ NetCommandResults NetConnectCommand::internalUpdate(TransmitPacketPtr& buffPacke client.setCurrentPacketCounter(0); this->g_promise.set_value(true); this->g_state = States::CONNECTED; - return NetCommandResults::SUCCESS; + this->markAsSucceeded(); + return; } auto const result = SSL_do_handshake(static_cast(info._ssl)); @@ -360,7 +390,8 @@ NetCommandResults NetConnectCommand::internalUpdate(TransmitPacketPtr& buffPacke ERR_print_errors_fp(stderr); client.getStatus().setNetworkStatus(ClientStatus::NetworkStatus::DISCONNECTED); this->g_promise.set_value(false); - return NetCommandResults::FAILURE; + this->markAsFailed(); + return; } } @@ -373,7 +404,7 @@ NetCommandResults NetConnectCommand::internalUpdate(TransmitPacketPtr& buffPacke if (pendingSize == 0) { FGE_DEBUG_PRINT("no crypt handshake to transmit"); - return NetCommandResults::WORKING; + return; } FGE_DEBUG_PRINT("transmitting crypt"); @@ -389,7 +420,8 @@ NetCommandResults NetConnectCommand::internalUpdate(TransmitPacketPtr& buffPacke FGE_DEBUG_PRINT("failed crypt"); client.getStatus().setNetworkStatus(ClientStatus::NetworkStatus::DISCONNECTED); this->g_promise.set_value(false); - return NetCommandResults::FAILURE; + this->markAsFailed(); + return; } FGE_DEBUG_PRINT("waiting response"); @@ -405,21 +437,19 @@ NetCommandResults NetConnectCommand::internalUpdate(TransmitPacketPtr& buffPacke client.setClientPacketCounter(0); client.setCurrentPacketCounter(0); this->g_promise.set_value(true); - return NetCommandResults::SUCCESS; + this->markAsSucceeded(); } break; case States::CONNECTED: - return NetCommandResults::SUCCESS; + this->markAsSucceeded(); default: break; } - - return NetCommandResults::WORKING; } -NetCommandResults NetConnectCommand::onReceive(std::unique_ptr& packet, - [[maybe_unused]] IpAddress::Types addressType, - Client& client) +void NetConnectCommand::onReceive(std::unique_ptr& packet, + [[maybe_unused]] IpAddress::Types addressType, + Client& client) { switch (this->g_state) { @@ -427,7 +457,7 @@ NetCommandResults NetConnectCommand::onReceive(std::unique_ptr& { if (packet->retrieveHeaderId().value() != NET_INTERNAL_ID_FGE_HANDSHAKE) { - return NetCommandResults::WORKING; + return; } std::unique_ptr packetOwned{std::move(packet)}; @@ -440,7 +470,8 @@ NetCommandResults NetConnectCommand::onReceive(std::unique_ptr& FGE_DEBUG_PRINT("handshake failed"); client.getStatus().setNetworkStatus(ClientStatus::NetworkStatus::DISCONNECTED); this->g_promise.set_value(false); - return NetCommandResults::FAILURE; + this->markAsFailed(); + return; } if (handshake != FGE_NET_HANDSHAKE_STRING) @@ -448,7 +479,8 @@ NetCommandResults NetConnectCommand::onReceive(std::unique_ptr& FGE_DEBUG_PRINT("handshake failed"); client.getStatus().setNetworkStatus(ClientStatus::NetworkStatus::DISCONNECTED); this->g_promise.set_value(false); - return NetCommandResults::FAILURE; + this->markAsFailed(); + return; } FGE_DEBUG_PRINT("RX ACKNOWLEDGED"); @@ -459,11 +491,12 @@ NetCommandResults NetConnectCommand::onReceive(std::unique_ptr& this->g_state = States::DEALING_WITH_MTU; } break; + case States::CRYPT_HANDSHAKE: case States::CRYPT_WAITING: { if (packet->retrieveHeaderId() != NET_INTERNAL_ID_CRYPT_HANDSHAKE) { - return NetCommandResults::WORKING; + return; } std::unique_ptr packetOwned{std::move(packet)}; @@ -481,12 +514,10 @@ NetCommandResults NetConnectCommand::onReceive(std::unique_ptr& } break; case States::CONNECTED: - return NetCommandResults::SUCCESS; + this->markAsSucceeded(); default: break; } - - return NetCommandResults::WORKING; } NetCommandResults NetConnectCommand::timeout(Client& client) @@ -497,21 +528,257 @@ NetCommandResults NetConnectCommand::timeout(Client& client) return NetCommandResults::FAILURE; } +//NetConnectHandlerCommand + +void NetConnectHandlerCommand::internalUpdate(TransmitPacketPtr& buffPacket, + IpAddress::Types addressType, + Client& client, + std::chrono::milliseconds deltaTime) +{ + switch (this->g_state) + { + case States::LOOKUP_MTU: + client._mtuFinalizedFlag = false; + + //Determine if we need to do MTU discovery (from our side) + if (client.getMTU() == 0) + { + this->g_mtuFuture = this->g_mtuCommand.get_future(); + this->g_state = States::DEALING_WITH_MTU; + } + else + { + this->g_state = States::WAITING_CLIENT_FINAL_MTU; + } + break; + case States::DEALING_WITH_MTU: + { + auto result = this->g_mtuCommand.update(buffPacket, addressType, client, deltaTime); + + if (result == NetCommandResults::FAILURE) + { + client.getStatus().setNetworkStatus(ClientStatus::NetworkStatus::DISCONNECTED); + this->g_promise.set_value(false); + this->markAsFailed(); + } + else if (result == NetCommandResults::SUCCESS) + { + auto const mtu = this->g_mtuFuture.get(); + + client.setMTU(mtu); + FGE_DEBUG_PRINT("MTU discovery ok, now waiting for client to finish"); + + auto response = CreatePacket(NET_INTERNAL_ID_MTU_FINAL); + response->doNotDiscard().doNotReorder().doNotFragment(); + client.pushPacket(std::move(response)); + + client.getStatus().resetTimeout(); + this->resetTimeout(); + this->g_state = States::WAITING_CLIENT_FINAL_MTU; + } + } + break; + case States::WAITING_CLIENT_FINAL_MTU: + if (!client._mtuFinalizedFlag) + { + return; + } + + FGE_DEBUG_PRINT("mtu finalized, starting crypt exchange"); + client.getStatus().setNetworkStatus(ClientStatus::NetworkStatus::MTU_DISCOVERED); + client.getStatus().setTimeout(FGE_NET_STATUS_DEFAULT_TIMEOUT); + this->resetTimeout(); + this->g_state = States::CRYPT_HANDSHAKE; + break; + case States::CRYPT_HANDSHAKE: + { + auto& info = client.getCryptInfo(); + + if (SSL_is_init_finished(static_cast(info._ssl)) == 1) + { + FGE_DEBUG_PRINT("TX CONNECTED"); + client.getStatus().setNetworkStatus(ClientStatus::NetworkStatus::CONNECTED); + client.getStatus().setTimeout(FGE_NET_STATUS_DEFAULT_CONNECTED_TIMEOUT); + client.setClientPacketCounter(0); + client.setCurrentPacketCounter(0); + this->g_promise.set_value(true); + this->g_state = States::CONNECTED; + this->markAsSucceeded(); + return; + } + + auto const result = SSL_do_handshake(static_cast(info._ssl)); + if (result <= 0) + { + auto const err = SSL_get_error(static_cast(info._ssl), result); + + if (err != SSL_ERROR_WANT_READ && err != SSL_ERROR_WANT_WRITE) + { + ERR_print_errors_fp(stderr); + client.getStatus().setNetworkStatus(ClientStatus::NetworkStatus::DISCONNECTED); + this->g_promise.set_value(false); + this->markAsFailed(); + return; + } + } + + FGE_DEBUG_PRINT("check for transmit crypt"); + + this->resetTimeout(); + + // Check if OpenSSL has produced encrypted handshake data + auto const pendingSize = BIO_ctrl_pending(static_cast(info._wbio)); + if (pendingSize == 0) + { + FGE_DEBUG_PRINT("no crypt handshake to transmit"); + return; + } + + FGE_DEBUG_PRINT("transmitting crypt"); + buffPacket = CreatePacket(NET_INTERNAL_ID_CRYPT_HANDSHAKE); + + auto const packetStartDataPosition = buffPacket->doNotDiscard().getDataSize(); + buffPacket->append(pendingSize); + + auto const finalSize = + BIO_read(static_cast(info._wbio), buffPacket->getData() + packetStartDataPosition, pendingSize); + if (finalSize <= 0 || static_cast(finalSize) != pendingSize) + { + FGE_DEBUG_PRINT("failed crypt"); + client.getStatus().setNetworkStatus(ClientStatus::NetworkStatus::DISCONNECTED); + this->g_promise.set_value(false); + this->markAsFailed(); + return; + } + + FGE_DEBUG_PRINT("waiting response"); + this->g_state = States::CRYPT_WAITING; + } + break; + case States::CRYPT_WAITING: + if (SSL_is_init_finished(static_cast(client.getCryptInfo()._ssl)) == 1) + { + FGE_DEBUG_PRINT("CONNECTED"); + client.getStatus().setNetworkStatus(ClientStatus::NetworkStatus::CONNECTED); + client.getStatus().setTimeout(FGE_NET_STATUS_DEFAULT_CONNECTED_TIMEOUT); + client.setClientPacketCounter(0); + client.setCurrentPacketCounter(0); + this->g_promise.set_value(true); + this->markAsSucceeded(); + } + break; + case States::CONNECTED: + this->markAsSucceeded(); + default: + break; + } +} + +void NetConnectHandlerCommand::onReceive(std::unique_ptr& packet, + [[maybe_unused]] IpAddress::Types addressType, + Client& client) +{ + switch (this->g_state) + { + case States::DEALING_WITH_MTU: + { + this->g_mtuCommand.onReceive(packet, addressType, client); + + if (!packet) + { + return; + } + } + [[fallthrough]]; + case States::WAITING_CLIENT_FINAL_MTU: + //At this point, the client still need to do the MTU discovery + switch (packet->retrieveHeaderId().value()) + { + case NET_INTERNAL_ID_MTU_TEST: + { + auto response = CreatePacket(NET_INTERNAL_ID_MTU_TEST_RESPONSE); + response->doNotDiscard().doNotReorder(); + client.pushPacket(std::move(response)); + client.getStatus().resetTimeout(); + FGE_DEBUG_PRINT("received MTU test"); + packet.reset(); + break; + } + case NET_INTERNAL_ID_MTU_ASK: + { + auto response = CreatePacket(NET_INTERNAL_ID_MTU_ASK_RESPONSE); + response->doNotDiscard().doNotReorder() + << SocketUdp::retrieveAdapterMTUForDestination(packet->getIdentity()._ip).value_or(0); + client.pushPacket(std::move(response)); + client.getStatus().resetTimeout(); + FGE_DEBUG_PRINT("received MTU ask"); + packet.reset(); + break; + } + case NET_INTERNAL_ID_MTU_FINAL: //TODO: need a timeout for the MTU final + FGE_DEBUG_PRINT("received MTU final"); + client._mtuFinalizedFlag = true; + client.getStatus().resetTimeout(); + packet.reset(); + break; + default: + break; + } + break; + case States::CRYPT_HANDSHAKE: + case States::CRYPT_WAITING: + { + if (packet->retrieveHeaderId() != NET_INTERNAL_ID_CRYPT_HANDSHAKE) + { + return; + } + + std::unique_ptr packetOwned{std::move(packet)}; + + auto& info = client.getCryptInfo(); + + auto const readPos = packetOwned->getReadPos(); + BIO_write(static_cast(info._rbio), packetOwned->getData() + readPos, + packetOwned->getDataSize() - readPos); + + FGE_DEBUG_PRINT("Crypt: received some data"); + + this->resetTimeout(); + this->g_state = States::CRYPT_HANDSHAKE; + } + break; + case States::CONNECTED: + this->markAsSucceeded(); + default: + break; + } +} + +NetCommandResults NetConnectHandlerCommand::timeout(Client& client) +{ + FGE_DEBUG_PRINT("connect: timeout"); + client.getStatus().setNetworkStatus(ClientStatus::NetworkStatus::DISCONNECTED); + this->g_promise.set_value(false); + return NetCommandResults::FAILURE; +} + //NetDisconnectCommand -NetCommandResults NetDisconnectCommand::internalUpdate(TransmitPacketPtr& buffPacket, - [[maybe_unused]] IpAddress::Types addressType, - Client& client) +void NetDisconnectCommand::internalUpdate(TransmitPacketPtr& buffPacket, + [[maybe_unused]] IpAddress::Types addressType, + Client& client, + [[maybe_unused]] std::chrono::milliseconds deltaTime) { if (client.getStatus().getNetworkStatus() == ClientStatus::NetworkStatus::DISCONNECTED) { this->g_promise.set_value(); - return NetCommandResults::SUCCESS; + this->markAsSucceeded(); + return; } if (this->g_transmitted) { - return NetCommandResults::WORKING; + return; } client.clearPackets(); @@ -520,15 +787,13 @@ NetCommandResults NetDisconnectCommand::internalUpdate(TransmitPacketPtr& buffPa buffPacket->doNotDiscard().doNotReorder().doNotFragment(); this->g_transmitted = true; - return NetCommandResults::SUCCESS; + this->markAsSucceeded(); } -NetCommandResults NetDisconnectCommand::onReceive([[maybe_unused]] std::unique_ptr& packet, - [[maybe_unused]] IpAddress::Types addressType, - [[maybe_unused]] Client& client) -{ - return NetCommandResults::WORKING; -} +void NetDisconnectCommand::onReceive([[maybe_unused]] std::unique_ptr& packet, + [[maybe_unused]] IpAddress::Types addressType, + [[maybe_unused]] Client& client) +{} NetCommandResults NetDisconnectCommand::timeout([[maybe_unused]] Client& client) { diff --git a/sources/network/C_netServer.cpp b/sources/network/C_netServer.cpp new file mode 100644 index 00000000..2953e470 --- /dev/null +++ b/sources/network/C_netServer.cpp @@ -0,0 +1,480 @@ +/* + * Copyright 2025 Guillaume Guillet + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "FastEngine/manager/network_manager.hpp" +#include "FastEngine/network/C_server.hpp" +#include "private/fge_crypt.hpp" +#include "private/fge_debug.hpp" +#include +#include + +using namespace fge::priv; + +namespace fge::net +{ + +//ServerSideNetUdp +ServerSideNetUdp::ServerSideNetUdp(IpAddress::Types type) : + g_threadReception(nullptr), + g_threadTransmission(nullptr), + g_defaultFlux(*this, true), + g_socket(type), + g_running(false) +{} +ServerSideNetUdp::~ServerSideNetUdp() +{ + this->stop(); +} + +void ServerSideNetUdp::setVersioningString(std::string_view versioningString) +{ + std::scoped_lock const lock{this->g_mutexServer}; + this->g_versioningString = versioningString; +} +std::string const& ServerSideNetUdp::getVersioningString() const +{ + return this->g_versioningString; +} + +bool ServerSideNetUdp::start(Port bindPort, IpAddress const& bindIp, IpAddress::Types addressType) +{ + if (this->g_running) + { + return false; + } + + this->g_socket.setAddressType(addressType); + if (this->g_socket.bind(bindPort, bindIp) == Socket::Errors::ERR_NOERROR) + { + if (!CryptServerInit(this->g_crypt_ctx)) + { + this->g_socket.close(); + return false; + } + + this->g_running = true; + + this->g_threadReception = std::make_unique(&ServerSideNetUdp::threadReception, this); + this->g_threadTransmission = std::make_unique(&ServerSideNetUdp::threadTransmission, this); + + return true; + } + return false; +} +bool ServerSideNetUdp::start(IpAddress::Types addressType) +{ + if (this->g_running) + { + return false; + } + this->g_socket.setAddressType(addressType); + if (this->g_socket.isValid()) + { + if (!CryptServerInit(this->g_crypt_ctx)) + { + this->g_socket.close(); + return false; + } + + this->g_running = true; + + this->g_threadReception = std::make_unique(&ServerSideNetUdp::threadReception, this); + this->g_threadTransmission = std::make_unique(&ServerSideNetUdp::threadTransmission, this); + + return true; + } + return false; +} +void ServerSideNetUdp::stop() +{ + if (this->g_running) + { + this->g_running = false; + + this->g_threadReception->join(); + this->g_threadTransmission->join(); + + this->g_threadReception = nullptr; + this->g_threadTransmission = nullptr; + + this->g_socket.close(); + + //Clear the flux + std::scoped_lock const lock(this->g_mutexServer); + for (auto& flux: this->g_fluxes) + { + flux->clearPackets(); + } + this->g_defaultFlux.clearPackets(); + decltype(this->g_transmissionQueue)().swap(this->g_transmissionQueue); + + CryptUninit(this->g_crypt_ctx); + } +} + +ServerNetFluxUdp* ServerSideNetUdp::newFlux() +{ + std::scoped_lock const lock(this->g_mutexServer); + + this->g_fluxes.push_back(std::make_unique(*this, false)); + return this->g_fluxes.back().get(); +} +ServerNetFluxUdp* ServerSideNetUdp::getFlux(std::size_t index) +{ + std::scoped_lock const lock(this->g_mutexServer); + + if (index >= this->g_fluxes.size()) + { + return nullptr; + } + return this->g_fluxes[index].get(); +} +ServerNetFluxUdp* ServerSideNetUdp::getDefaultFlux() +{ + return &this->g_defaultFlux; +} +std::size_t ServerSideNetUdp::getFluxSize() const +{ + return this->g_fluxes.size(); +} +IpAddress::Types ServerSideNetUdp::getAddressType() const +{ + return this->g_socket.getAddressType(); +} +void ServerSideNetUdp::closeFlux(NetFluxUdp* flux) +{ + std::scoped_lock const lock(this->g_mutexServer); + + for (std::size_t i = 0; i < this->g_fluxes.size(); ++i) + { + if (this->g_fluxes[i].get() == flux) + { + this->g_fluxes.erase(this->g_fluxes.begin() + i); + break; + } + } +} +void ServerSideNetUdp::closeAllFlux() +{ + std::scoped_lock const lock(this->g_mutexServer); + this->g_fluxes.clear(); +} + +void ServerSideNetUdp::repushPacket(ReceivedPacketPtr&& packet) +{ + if (!packet->checkFluxLifetime(this->g_fluxes.size())) + { + this->g_defaultFlux.pushPacket(std::move(packet)); + return; + } + this->g_fluxes[packet->getFluxIndex()]->forcePushPacket(std::move(packet)); +} + +void ServerSideNetUdp::notifyTransmission() +{ + this->g_transmissionNotifier.notify_one(); +} + +bool ServerSideNetUdp::isRunning() const +{ + return this->g_running; +} + +[[nodiscard]] bool ServerSideNetUdp::announceNewClient(Identity const& identity, ClientSharedPtr const& client) +{ + std::scoped_lock const lock(this->g_mutexServer); + + auto result = this->g_clientsMap.emplace(identity, client); + if (!result.second) + { + if (!result.first->second.expired()) + { + return result.first->second.lock() == client; + } + result.first->second = client; + } + return true; +} + +void ServerSideNetUdp::sendTo(TransmitPacketPtr& pck, Client const& client, Identity const& id) +{ + pck->applyOptions(client); + pck->doNotReorder(); + + { + std::scoped_lock const lock(this->g_mutexServer); + this->g_transmissionQueue.emplace(std::move(pck), id); + } + this->g_transmissionNotifier.notify_one(); +} +void ServerSideNetUdp::sendTo(TransmitPacketPtr& pck, Identity const& id) +{ + pck->applyOptions(); + pck->doNotReorder(); + + { + std::scoped_lock const lock(this->g_mutexServer); + this->g_transmissionQueue.emplace(std::move(pck), id); + } + this->g_transmissionNotifier.notify_one(); +} + +void* ServerSideNetUdp::getCryptContext() const +{ + return this->g_crypt_ctx; +} + +void ServerSideNetUdp::threadReception() +{ + Identity idReceive; + Packet pckReceive; + std::size_t pushingIndex = 0; + auto gcClientsMap = std::chrono::steady_clock::now(); + + CompressorLZ4 compressor; + + while (this->g_running) + { + if (this->g_socket.select(true, FGE_SERVER_PACKET_RECEPTION_TIMEOUT_MS) == Socket::Errors::ERR_NOERROR) + { + if (this->g_socket.receiveFrom(pckReceive, idReceive._ip, idReceive._port) == Socket::Errors::ERR_NOERROR) + { +#ifdef FGE_ENABLE_SERVER_NETWORK_RANDOM_LOST + if (fge::_random.range(0, 1000) <= 10) + { + continue; + } +#endif + + auto packet = std::make_unique(std::move(pckReceive), idReceive); + packet->setTimestamp(Client::getTimestamp_ms()); + + std::scoped_lock const lck(this->g_mutexServer); + + auto itClient = this->g_clientsMap.find(idReceive); + if (itClient != this->g_clientsMap.end()) + { + auto client = itClient->second.lock(); + if (!client) + { //bad client + this->g_clientsMap.erase(itClient); + } + else + { + //Check if the packet is encrypted + if (client->getStatus().isInEncryptedState()) + { + if (!CryptDecrypt(*client, *packet)) + { + continue; + } + } + } + } + + //Here we consider that the packet is not encrypted + if (!packet->haveCorrectHeader()) + { + continue; + } + //Skip the header for reading + packet->skip(ProtocolPacket::HeaderSize); + + //Decompress the packet if needed + if (!packet->decompress(compressor)) + { + continue; + } + + //Realm and countId is verified by the flux + + if (this->g_fluxes.empty()) + { + this->g_defaultFlux.pushPacket(std::move(packet)); + continue; + } + + //Try to push packet in a flux + for (std::size_t i = 0; i < this->g_fluxes.size(); ++i) + { + pushingIndex = packet->bumpFluxIndex(this->g_fluxes.size()); + if (this->g_fluxes[pushingIndex]->pushPacket(std::move(packet))) + { //Packet is correctly pushed + break; + } + } + //If every flux is busy, the new packet is dismissed + } + } + + //"Garbage collection" of the clients map + auto const now = std::chrono::steady_clock::now(); + if (std::chrono::duration_cast(now - gcClientsMap) >= + std::chrono::milliseconds{FGE_SERVER_CLIENTS_MAP_GC_DELAY_MS}) + { + gcClientsMap = now; + + for (auto it = this->g_clientsMap.begin(); it != this->g_clientsMap.end();) + { + if (it->second.expired()) + { + it = this->g_clientsMap.erase(it); + continue; + } + ++it; + } + } + } +} +void ServerSideNetUdp::threadTransmission() +{ + std::unique_lock lckServer(this->g_mutexServer); + CompressorLZ4 compressor; + std::chrono::steady_clock::time_point timePoint; + + while (this->g_running) + { + this->g_transmissionNotifier.wait_for(lckServer, std::chrono::milliseconds(10)); + + //Checking fluxes + for (std::size_t i = 0; i < this->g_fluxes.size() + 1; ++i) + { + ClientList* clients{nullptr}; + if (i == this->g_fluxes.size()) + { //Doing the default flux + clients = &this->g_defaultFlux._clients; + } + else + { + clients = &this->g_fluxes[i]->_clients; + } + + timePoint = std::chrono::steady_clock::now(); + auto clientLock = clients->acquireLock(); + + for (auto itClient = clients->begin(clientLock); itClient != clients->end(clientLock); ++itClient) + { + auto& clientData = itClient->second; + auto& client = clientData._client; + + //check cache + { + auto const clientLatency = + client->getPacketReturnRate() * FGE_NET_PACKET_CACHE_DELAY_FACTOR + + std::chrono::milliseconds(client->_latencyPlanner.getRoundTripTime().value_or(1)); + + while (clientData._context._cache.check( + timePoint, std::chrono::duration_cast(clientLatency))) + { + FGE_DEBUG_PRINT("re-transmit packet as client didn't acknowledge it"); + client->pushForcedFrontPacket(clientData._context._cache.pop()); + } + } + + if (client->isPendingPacketsEmpty()) + { + continue; + } + + if (client->getLastPacketLatency() < client->getSTOCLatency_ms()) + { + continue; + } + + auto transmissionPacket = client->popPacket(); + + if (!transmissionPacket->isMarkedAsCached()) + { + //Compression and applying options + transmissionPacket->applyOptions(*client); + if (!transmissionPacket->isFragmented()) + { + if (client->getStatus().isInEncryptedState()) + { + if (!transmissionPacket->compress(compressor)) + { + continue; + } + } + clientData._context._cache.push(transmissionPacket); + } + } + + //MTU check + if (!transmissionPacket->isFragmented() && + !transmissionPacket->checkFlags(FGE_NET_HEADER_DO_NOT_FRAGMENT_FLAG)) + { + auto const mtu = client->getMTU(); + + //Packet is not fragmented, we have to check is size + if (mtu == 0) + { //We don't know the MTU yet + goto mtu_check_skip; + } + + auto fragments = transmissionPacket->fragment(mtu); + for (std::size_t iFragment = 0; iFragment < fragments.size(); ++iFragment) + { + if (iFragment == 0) + { + transmissionPacket = std::move(fragments[iFragment]); + } + else + { + client->pushForcedFrontPacket(std::move(fragments[iFragment])); + } + } + } + mtu_check_skip: + + if (!transmissionPacket->packet() || !transmissionPacket->haveCorrectHeaderSize()) + { //Last verification of the packet + continue; + } + + //Check if the packet must be encrypted + if (transmissionPacket->isMarkedForEncryption()) + { + if (!CryptEncrypt(*client, *transmissionPacket)) + { + continue; + } + } + + //Sending the packet + this->g_socket.sendTo(transmissionPacket->packet(), itClient->first._ip, itClient->first._port); + client->resetLastPacketTimePoint(); + } + } + + //Checking isolated transmission queue TODO: maybe remove all that + while (!this->g_transmissionQueue.empty()) + { + auto data = std::move(this->g_transmissionQueue.front()); + this->g_transmissionQueue.pop(); + + if (!data.first->packet() || !data.first->haveCorrectHeaderSize()) + { //Last verification of the packet + continue; + } + + //Sending the packet + this->g_socket.sendTo(data.first->packet(), data.second._ip, data.second._port); + } + } +} + +} // namespace fge::net diff --git a/sources/network/C_networkType.cpp b/sources/network/C_networkType.cpp index 83c29ad7..d0e8d9b6 100644 --- a/sources/network/C_networkType.cpp +++ b/sources/network/C_networkType.cpp @@ -168,7 +168,7 @@ void PerClientSyncContext::clientsCheckup(ClientList const& clients, { auto const& evt = clients.getClientEvent(i); - if (evt._event == ClientListEvent::CLEVT_DELCLIENT) + if (evt._event == ClientList::Event::Types::EVT_DELCLIENT) { this->delClient(evt._id); } diff --git a/sources/network/C_protocol.cpp b/sources/network/C_protocol.cpp index 22091a9a..eb0cdad6 100644 --- a/sources/network/C_protocol.cpp +++ b/sources/network/C_protocol.cpp @@ -411,8 +411,46 @@ PacketReorderer::Stats PacketReorderer::Data::checkStat(ProtocolPacket::CounterT //PacketCache +PacketCache::PacketCache(PacketCache&& r) noexcept : + PacketCache() +{ + std::scoped_lock const lockThis(this->g_mutex); + std::scoped_lock const lockR(r.g_mutex); + + this->g_cache = std::move(r.g_cache); + this->g_start = r.g_start; + this->g_end = r.g_end; + this->g_enable = r.g_enable; + + r.g_cache.clear(); + r.g_cache.resize(FGE_NET_PACKET_CACHE_MAX); + r.g_start = 0; + r.g_end = 0; +} + +PacketCache& PacketCache::operator=(PacketCache&& r) noexcept +{ + if (this != &r) + { + std::scoped_lock const lockThis(this->g_mutex); + std::scoped_lock const lockR(r.g_mutex); + + this->g_cache = std::move(r.g_cache); + this->g_start = r.g_start; + this->g_end = r.g_end; + this->g_enable = r.g_enable; + + r.g_cache.clear(); + r.g_cache.resize(FGE_NET_PACKET_CACHE_MAX); + r.g_start = 0; + r.g_end = 0; + } + return *this; +} + void PacketCache::clear() { + std::scoped_lock const lock(this->g_mutex); this->g_cache.clear(); this->g_cache.resize(FGE_NET_PACKET_CACHE_MAX); this->g_start = 0; @@ -421,11 +459,28 @@ void PacketCache::clear() bool PacketCache::isEmpty() const { + std::scoped_lock const lock(this->g_mutex); return this->g_start == this->g_end; } +bool PacketCache::isEnabled() const +{ + std::scoped_lock const lock(this->g_mutex); + return this->g_enable; +} +void PacketCache::enable(bool enable) +{ + std::scoped_lock const lock(this->g_mutex); + this->g_enable = enable; +} void PacketCache::push(TransmitPacketPtr const& packet) { + std::scoped_lock const lock(this->g_mutex); + if (!this->g_enable) + { + return; + } + auto const next = (this->g_end + 1) % FGE_NET_PACKET_CACHE_MAX; if (next == this->g_start) @@ -452,6 +507,8 @@ void PacketCache::acknowledgeReception(std::span