From 3274051505c7a62d5970e9a37aa8ae2b152c8304 Mon Sep 17 00:00:00 2001 From: Konstantin Akimov Date: Wed, 17 Dec 2025 01:48:01 +0700 Subject: [PATCH 1/3] refactor: use NetHandler for CoinJoinServer to avoid multiple circular dependencies Co-authored-by: UdjinM6 --- src/coinjoin/server.cpp | 92 +++++++++++++------------ src/coinjoin/server.h | 17 ++--- src/init.cpp | 13 ++-- src/masternode/active/context.cpp | 12 ++-- src/masternode/active/context.h | 11 ++- src/net_processing.cpp | 24 +++++-- src/net_processing.h | 3 +- src/rpc/coinjoin.cpp | 2 +- test/lint/lint-circular-dependencies.py | 2 +- 9 files changed, 95 insertions(+), 81 deletions(-) diff --git a/src/coinjoin/server.cpp b/src/coinjoin/server.cpp index 98069a79cf11..66217340ee64 100644 --- a/src/coinjoin/server.cpp +++ b/src/coinjoin/server.cpp @@ -23,17 +23,17 @@ #include -CCoinJoinServer::CCoinJoinServer(ChainstateManager& chainman, CConnman& _connman, CDeterministicMNManager& dmnman, - CDSTXManager& dstxman, CMasternodeMetaMan& mn_metaman, CTxMemPool& mempool, - PeerManager& peerman, const CActiveMasternodeManager& mn_activeman, +CCoinJoinServer::CCoinJoinServer(PeerManagerInternal* peer_manager, ChainstateManager& chainman, CConnman& _connman, + CDeterministicMNManager& dmnman, CDSTXManager& dstxman, CMasternodeMetaMan& mn_metaman, + CTxMemPool& mempool, const CActiveMasternodeManager& mn_activeman, const CMasternodeSync& mn_sync, const llmq::CInstantSendManager& isman) : + NetHandler(peer_manager), m_chainman{chainman}, connman{_connman}, m_dmnman{dmnman}, m_dstxman{dstxman}, m_mn_metaman{mn_metaman}, mempool{mempool}, - m_peerman{peerman}, m_mn_activeman{mn_activeman}, m_mn_sync{mn_sync}, m_isman{isman}, @@ -44,20 +44,19 @@ CCoinJoinServer::CCoinJoinServer(ChainstateManager& chainman, CConnman& _connman CCoinJoinServer::~CCoinJoinServer() = default; -MessageProcessingResult CCoinJoinServer::ProcessMessage(CNode& peer, std::string_view msg_type, CDataStream& vRecv) +void CCoinJoinServer::ProcessMessage(CNode& peer, const std::string& msg_type, CDataStream& vRecv) { - if (!m_mn_sync.IsBlockchainSynced()) return {}; + if (!m_mn_sync.IsBlockchainSynced()) return; if (msg_type == NetMsgType::DSACCEPT) { ProcessDSACCEPT(peer, vRecv); } else if (msg_type == NetMsgType::DSQUEUE) { - return ProcessDSQUEUE(peer.GetId(), vRecv); + ProcessDSQUEUE(peer.GetId(), vRecv); } else if (msg_type == NetMsgType::DSVIN) { ProcessDSVIN(peer, vRecv); } else if (msg_type == NetMsgType::DSSIGNFINALTX) { ProcessDSSIGNFINALTX(vRecv); } - return {}; } void CCoinJoinServer::ProcessDSACCEPT(CNode& peer, CDataStream& vRecv) @@ -126,26 +125,25 @@ void CCoinJoinServer::ProcessDSACCEPT(CNode& peer, CDataStream& vRecv) } } -MessageProcessingResult CCoinJoinServer::ProcessDSQUEUE(NodeId from, CDataStream& vRecv) +void CCoinJoinServer::ProcessDSQUEUE(NodeId from, CDataStream& vRecv) { assert(m_mn_metaman.IsValid()); CCoinJoinQueue dsq; vRecv >> dsq; - MessageProcessingResult ret{}; - ret.m_to_erase = CInv{MSG_DSQ, dsq.GetHash()}; + WITH_LOCK(cs_main, m_peer_manager->PeerEraseObjectRequest(from, CInv{MSG_DSQ, dsq.GetHash()})); // Validate denomination first if (!CoinJoin::IsValidDenomination(dsq.nDenom)) { LogPrint(BCLog::COINJOIN, "DSQUEUE -- invalid denomination %d from peer %d\n", dsq.nDenom, from); - ret.m_error = MisbehavingError{10}; - return ret; + m_peer_manager->PeerMisbehaving(from, 10); + return; } if (dsq.masternodeOutpoint.IsNull() && dsq.m_protxHash.IsNull()) { - ret.m_error = MisbehavingError{100}; - return ret; + m_peer_manager->PeerMisbehaving(from, 100); + return; } const auto tip_mn_list = m_dmnman.GetListAtChainTip(); @@ -153,60 +151,59 @@ MessageProcessingResult CCoinJoinServer::ProcessDSQUEUE(NodeId from, CDataStream if (auto dmn = tip_mn_list.GetValidMN(dsq.m_protxHash)) { dsq.masternodeOutpoint = dmn->collateralOutpoint; } else { - ret.m_error = MisbehavingError{10}; - return ret; + m_peer_manager->PeerMisbehaving(from, 10); + return; } } { TRY_LOCK(cs_vecqueue, lockRecv); - if (!lockRecv) return ret; + if (!lockRecv) return; // process every dsq only once for (const auto& q : vecCoinJoinQueue) { if (q == dsq) { - return ret; + return; } if (q.fReady == dsq.fReady && q.masternodeOutpoint == dsq.masternodeOutpoint) { // no way the same mn can send another dsq with the same readiness this soon LogPrint(BCLog::COINJOIN, "DSQUEUE -- Peer %d is sending WAY too many dsq messages for a masternode with collateral %s\n", from, dsq.masternodeOutpoint.ToStringShort()); - return ret; + return; } } } // cs_vecqueue LogPrint(BCLog::COINJOIN, "DSQUEUE -- %s new\n", dsq.ToString()); - if (dsq.IsTimeOutOfBounds()) return ret; + if (dsq.IsTimeOutOfBounds()) return; auto dmn = tip_mn_list.GetValidMNByCollateral(dsq.masternodeOutpoint); - if (!dmn) return ret; + if (!dmn) return; if (dsq.m_protxHash.IsNull()) { dsq.m_protxHash = dmn->proTxHash; } if (!dsq.CheckSignature(dmn->pdmnState->pubKeyOperator.Get())) { - ret.m_error = MisbehavingError{10}; - return ret; + m_peer_manager->PeerMisbehaving(from, 10); + return; } if (!dsq.fReady) { //don't allow a few nodes to dominate the queuing process if (m_mn_metaman.IsMixingThresholdExceeded(dmn->proTxHash, tip_mn_list.GetValidMNsCount())) { LogPrint(BCLog::COINJOIN, "DSQUEUE -- node sending too many dsq messages, masternode=%s\n", dmn->proTxHash.ToString()); - return ret; + return; } m_mn_metaman.AllowMixing(dmn->proTxHash); LogPrint(BCLog::COINJOIN, "DSQUEUE -- new CoinJoin queue, masternode=%s, queue=%s\n", dmn->proTxHash.ToString(), dsq.ToString()); TRY_LOCK(cs_vecqueue, lockRecv); - if (!lockRecv) return ret; + if (!lockRecv) return; vecCoinJoinQueue.push_back(dsq); - ret.m_dsq.push_back(dsq); + m_peer_manager->PeerRelayDSQ(dsq); } - return ret; } void CCoinJoinServer::ProcessDSVIN(CNode& peer, CDataStream& vRecv) @@ -275,7 +272,8 @@ void CCoinJoinServer::SetNull() // void CCoinJoinServer::CheckPool() { - if (int entries = GetEntriesCount(); entries != 0) LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CheckPool -- entries count %lu\n", entries); + if (int entries = GetEntriesCount(); entries != 0) + LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CheckPool -- entries count %lu\n", entries); // If we have an entry for each collateral, then create final tx if (nState == POOL_STATE_ACCEPTING_ENTRIES && size_t(GetEntriesCount()) == vecSessionCollaterals.size()) { @@ -286,8 +284,8 @@ void CCoinJoinServer::CheckPool() // Check for Time Out // If we timed out while accepting entries, then if we have more than minimum, create final tx - if (nState == POOL_STATE_ACCEPTING_ENTRIES && CCoinJoinServer::HasTimedOut() - && GetEntriesCount() >= CoinJoin::GetMinPoolParticipants()) { + if (nState == POOL_STATE_ACCEPTING_ENTRIES && CCoinJoinServer::HasTimedOut() && + GetEntriesCount() >= CoinJoin::GetMinPoolParticipants()) { // Punish misbehaving participants ChargeFees(); // Try to complete this session ignoring the misbehaving ones @@ -326,7 +324,8 @@ void CCoinJoinServer::CreateFinalTransaction() sort(txNew.vout.begin(), txNew.vout.end(), CompareOutputBIP69()); finalMutableTransaction = txNew; - LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CreateFinalTransaction -- finalMutableTransaction=%s", txNew.ToString()); /* Continued */ + LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CreateFinalTransaction -- finalMutableTransaction=%s", /* Continued */ + txNew.ToString()); // request signatures from clients SetState(POOL_STATE_SIGNING); @@ -340,14 +339,16 @@ void CCoinJoinServer::CommitFinalTransaction() CTransactionRef finalTransaction = WITH_LOCK(cs_coinjoin, return MakeTransactionRef(finalMutableTransaction)); uint256 hashTx = finalTransaction->GetHash(); - LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CommitFinalTransaction -- finalTransaction=%s", finalTransaction->ToString()); /* Continued */ + LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CommitFinalTransaction -- finalTransaction=%s", /* Continued */ + finalTransaction->ToString()); { // See if the transaction is valid TRY_LOCK(::cs_main, lockMain); mempool.PrioritiseTransaction(hashTx, 0.1 * COIN); if (!lockMain || !ATMPIfSaneFee(m_chainman, finalTransaction)) { - LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CommitFinalTransaction -- ATMPIfSaneFee() error: Transaction not valid\n"); + LogPrint(BCLog::COINJOIN, /* Continued */ + "CCoinJoinServer::CommitFinalTransaction -- ATMPIfSaneFee() error: Transaction not valid\n"); WITH_LOCK(cs_coinjoin, SetNull()); // not much we can do in this case, just notify clients RelayCompletedTransaction(ERR_INVALID_TX); @@ -368,7 +369,7 @@ void CCoinJoinServer::CommitFinalTransaction() LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CommitFinalTransaction -- TRANSMITTING DSTX\n"); CInv inv(MSG_DSTX, hashTx); - m_peerman.RelayInv(inv); + m_peer_manager->PeerRelayInv(inv); // Tell the clients it was successful RelayCompletedTransaction(MSG_SUCCESS); @@ -411,7 +412,9 @@ void CCoinJoinServer::ChargeFees() const // This queue entry didn't send us the promised transaction if (!fFound) { - LogPrint(BCLog::COINJOIN, "CCoinJoinServer::ChargeFees -- found uncooperative node (didn't send transaction), found offence\n"); + LogPrint(BCLog::COINJOIN, /* Continued */ + "CCoinJoinServer::ChargeFees -- found uncooperative node (didn't send transaction), found " + "offence\n"); vecOffendersCollaterals.push_back(txCollateral); } } @@ -423,7 +426,8 @@ void CCoinJoinServer::ChargeFees() const for (const auto& entry : vecEntries) { for (const auto& txdsin : entry.vecTxDSIn) { if (!txdsin.fHasSig) { - LogPrint(BCLog::COINJOIN, "CCoinJoinServer::ChargeFees -- found uncooperative node (didn't sign), found offence\n"); + LogPrint(BCLog::COINJOIN, /* Continued */ + "CCoinJoinServer::ChargeFees -- found uncooperative node (didn't sign), found offence\n"); vecOffendersCollaterals.push_back(entry.txCollateral); } } @@ -443,8 +447,9 @@ void CCoinJoinServer::ChargeFees() const Shuffle(vecOffendersCollaterals.begin(), vecOffendersCollaterals.end(), FastRandomContext()); if (nState == POOL_STATE_ACCEPTING_ENTRIES || nState == POOL_STATE_SIGNING) { - LogPrint(BCLog::COINJOIN, "CCoinJoinServer::ChargeFees -- found uncooperative node (didn't %s transaction), charging fees: %s", /* Continued */ - (nState == POOL_STATE_SIGNING) ? "sign" : "send", vecOffendersCollaterals[0]->ToString()); + LogPrint(BCLog::COINJOIN, /* Continued */ + "CCoinJoinServer::ChargeFees -- found uncooperative node (didn't %s transaction), charging fees: %s", + (nState == POOL_STATE_SIGNING) ? "sign" : "send", vecOffendersCollaterals[0]->ToString()); ConsumeCollateral(vecOffendersCollaterals[0]); } } @@ -465,7 +470,8 @@ void CCoinJoinServer::ChargeRandomFees() const { for (const auto& txCollateral : vecSessionCollaterals) { if (GetRand(/*nMax=*/100) > 10) return; - LogPrint(BCLog::COINJOIN, "CCoinJoinServer::ChargeRandomFees -- charging random fees, txCollateral=%s", txCollateral->ToString()); /* Continued */ + LogPrint(BCLog::COINJOIN, /* Continued */ + "CCoinJoinServer::ChargeRandomFees -- charging random fees, txCollateral=%s", txCollateral->ToString()); ConsumeCollateral(txCollateral); } } @@ -476,7 +482,7 @@ void CCoinJoinServer::ConsumeCollateral(const CTransactionRef& txref) const if (!ATMPIfSaneFee(m_chainman, txref)) { LogPrint(BCLog::COINJOIN, "%s -- ATMPIfSaneFee failed\n", __func__); } else { - m_peerman.RelayTransaction(txref->GetHash()); + m_peer_manager->PeerRelayTransaction(txref->GetHash()); LogPrint(BCLog::COINJOIN, "%s -- Collateral was consumed\n", __func__); } } @@ -521,7 +527,7 @@ void CCoinJoinServer::CheckForCompleteQueue() LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CheckForCompleteQueue -- queue is ready, signing and relaying (%s) " /* Continued */ "with %d participants\n", dsq.ToString(), vecSessionCollaterals.size()); dsq.vchSig = m_mn_activeman.SignBasic(dsq.GetSignatureHash()); - m_peerman.RelayDSQ(dsq); + m_peer_manager->PeerRelayDSQ(dsq); WITH_LOCK(cs_vecqueue, vecCoinJoinQueue.push_back(dsq)); } } @@ -731,7 +737,7 @@ bool CCoinJoinServer::CreateNewSession(const CCoinJoinAccept& dsa, PoolMessage& GetAdjustedTime(), false); LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CreateNewSession -- signing and relaying new queue: %s\n", dsq.ToString()); dsq.vchSig = m_mn_activeman.SignBasic(dsq.GetSignatureHash()); - m_peerman.RelayDSQ(dsq); + m_peer_manager->PeerRelayDSQ(dsq); LOCK(cs_vecqueue); vecCoinJoinQueue.push_back(dsq); } diff --git a/src/coinjoin/server.h b/src/coinjoin/server.h index cae4b8f8dddb..67e697a1549e 100644 --- a/src/coinjoin/server.h +++ b/src/coinjoin/server.h @@ -6,13 +6,12 @@ #define BITCOIN_COINJOIN_SERVER_H #include -#include +#include #include #include class CActiveMasternodeManager; -class CCoinJoinServer; class CConnman; class CDataStream; class CDeterministicMNManager; @@ -21,13 +20,12 @@ class ChainstateManager; class CMasternodeMetaMan; class CNode; class CTxMemPool; -class PeerManager; class UniValue; /** Used to keep track of current status of mixing pool */ -class CCoinJoinServer : public CCoinJoinBaseSession, public CCoinJoinBaseManager +class CCoinJoinServer : public CCoinJoinBaseSession, public CCoinJoinBaseManager, public NetHandler { private: ChainstateManager& m_chainman; @@ -36,7 +34,6 @@ class CCoinJoinServer : public CCoinJoinBaseSession, public CCoinJoinBaseManager CDSTXManager& m_dstxman; CMasternodeMetaMan& m_mn_metaman; CTxMemPool& mempool; - PeerManager& m_peerman; const CActiveMasternodeManager& m_mn_activeman; const CMasternodeSync& m_mn_sync; const llmq::CInstantSendManager& m_isman; @@ -87,7 +84,7 @@ class CCoinJoinServer : public CCoinJoinBaseSession, public CCoinJoinBaseManager void RelayCompletedTransaction(PoolMessage nMessageID) EXCLUSIVE_LOCKS_REQUIRED(!cs_coinjoin); void ProcessDSACCEPT(CNode& peer, CDataStream& vRecv) EXCLUSIVE_LOCKS_REQUIRED(!cs_vecqueue); - [[nodiscard]] MessageProcessingResult ProcessDSQUEUE(NodeId from, CDataStream& vRecv) EXCLUSIVE_LOCKS_REQUIRED(!cs_vecqueue); + void ProcessDSQUEUE(NodeId from, CDataStream& vRecv) EXCLUSIVE_LOCKS_REQUIRED(!cs_vecqueue); void ProcessDSVIN(CNode& peer, CDataStream& vRecv) EXCLUSIVE_LOCKS_REQUIRED(!cs_coinjoin); void ProcessDSSIGNFINALTX(CDataStream& vRecv) EXCLUSIVE_LOCKS_REQUIRED(!cs_coinjoin); @@ -97,13 +94,13 @@ class CCoinJoinServer : public CCoinJoinBaseSession, public CCoinJoinBaseManager CCoinJoinServer() = delete; CCoinJoinServer(const CCoinJoinServer&) = delete; CCoinJoinServer& operator=(const CCoinJoinServer&) = delete; - explicit CCoinJoinServer(ChainstateManager& chainman, CConnman& _connman, CDeterministicMNManager& dmnman, - CDSTXManager& dstxman, CMasternodeMetaMan& mn_metaman, CTxMemPool& mempool, - PeerManager& peerman, const CActiveMasternodeManager& mn_activeman, + explicit CCoinJoinServer(PeerManagerInternal* peer_manager, ChainstateManager& chainman, CConnman& _connman, + CDeterministicMNManager& dmnman, CDSTXManager& dstxman, CMasternodeMetaMan& mn_metaman, + CTxMemPool& mempool, const CActiveMasternodeManager& mn_activeman, const CMasternodeSync& mn_sync, const llmq::CInstantSendManager& isman); ~CCoinJoinServer(); - [[nodiscard]] MessageProcessingResult ProcessMessage(CNode& pfrom, std::string_view msg_type, CDataStream& vRecv); + void ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv) override; bool HasTimedOut() const; void CheckTimeout(); diff --git a/src/init.cpp b/src/init.cpp index dcd0923bdd5d..214568a58db9 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -2199,15 +2199,18 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) // ********************************************************* Step 7c: Setup masternode mode assert(!node.active_ctx); assert(!g_active_notification_interface); + node.peerman->AddExtraHandler(std::make_unique(node.peerman.get(), *node.llmq_ctx->isman, *node.llmq_ctx->qman, chainman.ActiveChainstate())); + node.peerman->AddExtraHandler(std::make_unique(node.peerman.get(), *node.llmq_ctx->sigman)); if (node.mn_activeman) { - node.active_ctx = std::make_unique(chainman, *node.connman, *node.dmnman, *node.dstxman, *node.govman, *node.mn_metaman, + std::unique_ptr cj_server = std::make_unique(node.peerman.get(), chainman, *node.connman, *node.dmnman, *node.dstxman, *node.mn_metaman, *node.mempool, *node.mn_activeman, *node.mn_sync, *node.llmq_ctx->isman); + + node.active_ctx = std::make_unique(chainman, *node.connman, *node.dmnman, *node.govman, *node.mnhf_manager, *node.sporkman, *node.mempool, *node.llmq_ctx, *node.peerman, - *node.mn_activeman, *node.mn_sync); + *node.mn_activeman, *node.mn_sync, *cj_server); + node.peerman->AddExtraHandler(std::move(cj_server)); g_active_notification_interface = std::make_unique(*node.active_ctx, *node.mn_activeman); RegisterValidationInterface(g_active_notification_interface.get()); } - node.peerman->AddExtraHandler(std::make_unique(node.peerman.get(), *node.llmq_ctx->isman, *node.llmq_ctx->qman, chainman.ActiveChainstate())); - node.peerman->AddExtraHandler(std::make_unique(node.peerman.get(), *node.llmq_ctx->sigman)); // ********************************************************* Step 7d: Setup other Dash services @@ -2314,7 +2317,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) node.peerman->ScheduleHandlers(*node.scheduler); if (node.mn_activeman) { - node.scheduler->scheduleEvery(std::bind(&CCoinJoinServer::DoMaintenance, std::ref(*node.active_ctx->cj_server)), std::chrono::seconds{1}); + node.scheduler->scheduleEvery(std::bind(&CCoinJoinServer::DoMaintenance, std::ref(node.active_ctx->m_cj_server)), std::chrono::seconds{1}); node.scheduler->scheduleEvery(std::bind(&llmq::CDKGSessionManager::CleanupOldContributions, std::ref(*node.llmq_ctx->qdkgsman)), std::chrono::hours{1}); } diff --git a/src/masternode/active/context.cpp b/src/masternode/active/context.cpp index 8051d070591f..b603b2c9dd22 100644 --- a/src/masternode/active/context.cpp +++ b/src/masternode/active/context.cpp @@ -6,7 +6,6 @@ #include #include -#include #include #include #include @@ -18,13 +17,12 @@ #include ActiveContext::ActiveContext(ChainstateManager& chainman, CConnman& connman, CDeterministicMNManager& dmnman, - CDSTXManager& dstxman, CGovernanceManager& govman, CMasternodeMetaMan& mn_metaman, - CMNHFManager& mnhfman, CSporkManager& sporkman, CTxMemPool& mempool, LLMQContext& llmq_ctx, - PeerManager& peerman, const CActiveMasternodeManager& mn_activeman, - const CMasternodeSync& mn_sync) : + CGovernanceManager& govman, CMNHFManager& mnhfman, CSporkManager& sporkman, + CTxMemPool& mempool, LLMQContext& llmq_ctx, PeerManager& peerman, + const CActiveMasternodeManager& mn_activeman, const CMasternodeSync& mn_sync, + CoinJoinServer& cj_server) : m_llmq_ctx{llmq_ctx}, - cj_server{std::make_unique(chainman, connman, dmnman, dstxman, mn_metaman, mempool, peerman, - mn_activeman, mn_sync, *llmq_ctx.isman)}, + m_cj_server(cj_server), gov_signer{std::make_unique(connman, dmnman, govman, mn_activeman, chainman, mn_sync)}, shareman{std::make_unique(connman, chainman.ActiveChainstate(), *llmq_ctx.sigman, peerman, mn_activeman, *llmq_ctx.qman, sporkman)}, diff --git a/src/masternode/active/context.h b/src/masternode/active/context.h index c34fbae09d22..1d7c253a1da5 100644 --- a/src/masternode/active/context.h +++ b/src/masternode/active/context.h @@ -12,9 +12,7 @@ class ChainstateManager; class CCoinJoinServer; class CConnman; class CDeterministicMNManager; -class CDSTXManager; class CGovernanceManager; -class CMasternodeMetaMan; class CMasternodeSync; class CMNHFManager; class CSporkManager; @@ -43,10 +41,9 @@ struct ActiveContext { ActiveContext(const ActiveContext&) = delete; ActiveContext& operator=(const ActiveContext&) = delete; explicit ActiveContext(ChainstateManager& chainman, CConnman& connman, CDeterministicMNManager& dmnman, - CDSTXManager& dstxman, CGovernanceManager& govman, CMasternodeMetaMan& mn_metaman, - CMNHFManager& mnhfman, CSporkManager& sporkman, CTxMemPool& mempool, LLMQContext& llmq_ctx, - PeerManager& peerman, const CActiveMasternodeManager& mn_activeman, - const CMasternodeSync& mn_sync); + CGovernanceManager& govman, CMNHFManager& mnhfman, CSporkManager& sporkman, CTxMemPool& mempool, + LLMQContext& llmq_ctx, PeerManager& peerman, const CActiveMasternodeManager& mn_activeman, + const CMasternodeSync& mn_sync, CCoinJoinServer& cj_server); ~ActiveContext(); void Interrupt(); @@ -58,7 +55,7 @@ struct ActiveContext { * and are accessible in their own right * TODO: Move CActiveMasternodeManager here when dependents have been migrated */ - const std::unique_ptr cj_server; + CCoinJoinServer& m_cj_server; const std::unique_ptr gov_signer; const std::unique_ptr shareman; const std::unique_ptr ehf_sighandler; diff --git a/src/net_processing.cpp b/src/net_processing.cpp index d69a44eeb4cb..8b5447359d82 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -41,6 +41,7 @@ #include #include +#include #include #include #include @@ -652,6 +653,8 @@ class PeerManagerImpl final : public PeerManager void PeerRelayInv(const CInv& inv) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void PeerRelayInvFiltered(const CInv& inv, const CTransaction& relatedTx) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void PeerRelayInvFiltered(const CInv& inv, const uint256& relatedTxHash) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + void PeerRelayDSQ(const CCoinJoinQueue& queue) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + void PeerRelayTransaction(const uint256& txid) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void PeerAskPeersForTransaction(const uint256& txid) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); size_t PeerGetRequestedObjectCount(NodeId nodeid) const override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, ::cs_main); void PeerPostProcessMessage(MessageProcessingResult&& ret) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); @@ -2348,13 +2351,16 @@ bool PeerManagerImpl::AlreadyHave(const CInv& inv) case MSG_QUORUM_PREMATURE_COMMITMENT: return m_llmq_ctx->qdkgsman->AlreadyHave(inv); case MSG_QUORUM_RECOVERED_SIG: + // TODO: move it to NetSigning return m_llmq_ctx->sigman->AlreadyHave(inv); case MSG_CLSIG: return m_llmq_ctx->clhandler->AlreadyHave(inv); + // TODO: move it to NetInstantSend case MSG_ISDLOCK: return m_llmq_ctx->isman->AlreadyHave(inv); + // TODO: move it to CoinJoinServer case MSG_DSQ: - return (m_cj_walletman && m_cj_walletman->hasQueue(inv.hash)) || (m_active_ctx && m_active_ctx->cj_server->HasQueue(inv.hash)); + return (m_cj_walletman && m_cj_walletman->hasQueue(inv.hash)) || (m_active_ctx && m_active_ctx->m_cj_server.HasQueue(inv.hash)); case MSG_PLATFORM_BAN: return m_mn_metaman.AlreadyHavePlatformBan(inv.hash); } @@ -2972,7 +2978,7 @@ void PeerManagerImpl::ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic } } if (!push && inv.type == MSG_DSQ) { - auto opt_dsq = m_active_ctx ? m_active_ctx->cj_server->GetQueueFromHash(inv.hash) : std::nullopt; + auto opt_dsq = m_active_ctx ? m_active_ctx->m_cj_server.GetQueueFromHash(inv.hash) : std::nullopt; if (m_cj_walletman && !opt_dsq.has_value()) { opt_dsq = m_cj_walletman->getQueueFromHash(inv.hash); } @@ -3604,9 +3610,6 @@ void PeerManagerImpl::PostProcessMessage(MessageProcessingResult&& result, NodeI for (const auto& inv : result.m_inventory) { RelayInv(inv); } - for (const auto& dsq : result.m_dsq) { - RelayDSQ(dsq); - } } MessageProcessingResult PeerManagerImpl::ProcessPlatformBanMessage(NodeId node, std::string_view msg_type, CDataStream& vRecv) @@ -5442,7 +5445,6 @@ void PeerManagerImpl::ProcessMessage( PostProcessMessage(m_cj_walletman->processMessage(pfrom, m_chainman.ActiveChainstate(), m_connman, m_mempool, msg_type, vRecv), pfrom.GetId()); } if (m_active_ctx) { - PostProcessMessage(m_active_ctx->cj_server->ProcessMessage(pfrom, msg_type, vRecv), pfrom.GetId()); m_active_ctx->shareman->ProcessMessage(pfrom, msg_type, vRecv); } PostProcessMessage(m_sporkman.ProcessMessage(pfrom, m_connman, msg_type, vRecv), pfrom.GetId()); @@ -6572,6 +6574,16 @@ void PeerManagerImpl::PeerRelayInvFiltered(const CInv& inv, const uint256& relat RelayInvFiltered(inv, relatedTxHash); } +void PeerManagerImpl::PeerRelayDSQ(const CCoinJoinQueue& queue) +{ + RelayDSQ(queue); +} + +void PeerManagerImpl::PeerRelayTransaction(const uint256& txid) +{ + RelayTransaction(txid); +} + void PeerManagerImpl::PeerAskPeersForTransaction(const uint256& txid) { AskPeersForTransaction(txid); diff --git a/src/net_processing.h b/src/net_processing.h index a449ee2a625b..824a46717654 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -17,7 +17,6 @@ class AddrMan; class CActiveMasternodeManager; class CCoinJoinQueue; -class CCoinJoinServer; class CDeterministicMNManager; class CDSTXManager; class CGovernanceManager; @@ -63,6 +62,8 @@ class PeerManagerInternal virtual void PeerRelayInv(const CInv& inv) = 0; virtual void PeerRelayInvFiltered(const CInv& inv, const CTransaction& relatedTx) = 0; virtual void PeerRelayInvFiltered(const CInv& inv, const uint256& relatedTxHash) = 0; + virtual void PeerRelayTransaction(const uint256& txid) = 0; + virtual void PeerRelayDSQ(const CCoinJoinQueue& queue) = 0; virtual void PeerAskPeersForTransaction(const uint256& txid) = 0; virtual size_t PeerGetRequestedObjectCount(NodeId nodeid) const = 0; virtual void PeerPostProcessMessage(MessageProcessingResult&& ret) = 0; diff --git a/src/rpc/coinjoin.cpp b/src/rpc/coinjoin.cpp index d3dd399dd35d..a21cfe7a27cb 100644 --- a/src/rpc/coinjoin.cpp +++ b/src/rpc/coinjoin.cpp @@ -465,7 +465,7 @@ static RPCHelpMan getcoinjoininfo() const NodeContext& node = EnsureAnyNodeContext(request.context); if (node.mn_activeman) { - node.active_ctx->cj_server->GetJsonInfo(obj); + node.active_ctx->m_cj_server.GetJsonInfo(obj); return obj; } diff --git a/test/lint/lint-circular-dependencies.py b/test/lint/lint-circular-dependencies.py index 03cae3d37b63..264d74a7a3a5 100755 --- a/test/lint/lint-circular-dependencies.py +++ b/test/lint/lint-circular-dependencies.py @@ -32,9 +32,9 @@ "chainlock/chainlock -> validation -> evo/chainhelper -> chainlock/chainlock", "chainlock/signing -> llmq/signing_shares -> net_processing -> masternode/active/context -> chainlock/signing", "coinjoin/coinjoin -> instantsend/instantsend -> spork -> msg_result -> coinjoin/coinjoin", + "coinjoin/coinjoin -> instantsend/instantsend -> instantsend/signing -> llmq/signing_shares -> net_processing -> coinjoin/coinjoin", "coinjoin/client -> coinjoin/coinjoin -> instantsend/instantsend -> instantsend/signing -> llmq/signing_shares -> net_processing -> coinjoin/walletman -> coinjoin/client", "coinjoin/server -> net_processing -> coinjoin/server", - "coinjoin/server -> net_processing -> masternode/active/context -> coinjoin/server", "common/bloom -> evo/assetlocktx -> llmq/commitment -> evo/deterministicmns -> evo/simplifiedmns -> merkleblock -> common/bloom", "common/bloom -> evo/assetlocktx -> llmq/quorums -> net -> common/bloom", "consensus/tx_verify -> evo/assetlocktx -> llmq/commitment -> validation -> consensus/tx_verify", From e8685c5a8c80e96d823157db087a755d445b829d Mon Sep 17 00:00:00 2001 From: Konstantin Akimov Date: Wed, 17 Dec 2025 02:48:23 +0700 Subject: [PATCH 2/3] refactor: use Scheduler's feature of NetHandler for coinjoin/server --- src/coinjoin/server.cpp | 21 +++++++++++++-------- src/coinjoin/server.h | 3 +-- src/init.cpp | 1 - src/masternode/active/context.cpp | 2 +- 4 files changed, 15 insertions(+), 12 deletions(-) diff --git a/src/coinjoin/server.cpp b/src/coinjoin/server.cpp index 66217340ee64..e6752d729054 100644 --- a/src/coinjoin/server.cpp +++ b/src/coinjoin/server.cpp @@ -10,8 +10,9 @@ #include #include #include -#include #include +#include +#include #include