From 7b8553f630a3254a34f09c15420ba831dfcfbbb0 Mon Sep 17 00:00:00 2001 From: Claude Code Date: Wed, 3 Dec 2025 11:05:25 -0600 Subject: [PATCH] Merge bitcoin/bitcoin#27626: Parallel compact block downloads, take 3 This is a backport of bitcoin/bitcoin#27626 This PR attempts to mitigate a single case, where high bandwidth peers can bail us out of a flakey peer not completing blocks for us. We allow up to 2 additional getblocktxns requests per unique block. This would hopefully allow the chance for an honest high bandwidth peer to hand us the transactions even if the first in flight peer stalls out. Changes: - Convert mapBlocksInFlight from std::map to std::multimap to allow multiple peers to be downloading the same block - Add IsBlockRequestedFromOutbound() to check if block is being fetched from an outbound peer - Update RemoveBlockRequest() to take optional from_peer parameter to selectively remove only that peers request - Add MAX_CMPCTBLOCKS_INFLIGHT_PER_BLOCK = 3 constant - Remove nBlocksInFlight counter, use vBlocksInFlight.size() instead - Add parallel download test in p2p_compactblocks.py Generated with Claude Code Co-Authored-By: Claude --- src/net.h | 2 + src/net_processing.cpp | 243 ++++++++++++++++++--------- src/net_processing.h | 2 + src/rpc/blockchain.cpp | 2 +- test/functional/p2p_compactblocks.py | 88 +++++++++- 5 files changed, 257 insertions(+), 80 deletions(-) diff --git a/src/net.h b/src/net.h index f49f93c45449..21d11f01d66a 100644 --- a/src/net.h +++ b/src/net.h @@ -221,7 +221,9 @@ class CNodeStats int nVersion; std::string cleanSubVer; bool fInbound; + // We requested high bandwidth connection to peer bool m_bip152_highbandwidth_to; + // Peer requested high bandwidth connection bool m_bip152_highbandwidth_from; int m_starting_height; uint64_t nSendBytes; diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 07aa813c4fe4..02e2ed88efea 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -455,7 +455,6 @@ struct CNodeState { std::list vBlocksInFlight; //! When the first entry in vBlocksInFlight started downloading. Don't care when vBlocksInFlight is empty. std::chrono::microseconds m_downloading_since{0us}; - int nBlocksInFlight{0}; //! Whether we consider this a preferred download peer. bool fPreferredDownload{false}; //! Whether this peer wants invs or headers (when possible) for block announcements. @@ -1018,11 +1017,17 @@ class PeerManagerImpl final : public PeerManager /** Have we requested this block from a peer */ bool IsBlockRequested(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main); + /** Have we requested this block from an outbound peer */ + bool IsBlockRequestedFromOutbound(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main); + /** Remove this block from our tracked requested blocks. Called if: - * - the block has been recieved from a peer + * - the block has been received from a peer * - the request for the block has timed out + * If "from_peer" is specified, then only remove the block if it is in + * flight from that peer (to avoid one peer's network traffic from + * affecting another's state). */ - void RemoveBlockRequest(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main); + void RemoveBlockRequest(const uint256& hash, std::optional from_peer) EXCLUSIVE_LOCKS_REQUIRED(cs_main); /* Mark a block as in flight * Returns false, still setting pit, if the block was already in flight from the same peer @@ -1037,7 +1042,9 @@ class PeerManagerImpl final : public PeerManager */ void FindNextBlocksToDownload(const Peer& peer, unsigned int count, std::vector& vBlocks, NodeId& nodeStaller) EXCLUSIVE_LOCKS_REQUIRED(cs_main); - std::map::iterator> > mapBlocksInFlight GUARDED_BY(cs_main); + /* Multimap used to preserve insertion order */ + typedef std::multimap::iterator>> BlockDownloadMap; + BlockDownloadMap mapBlocksInFlight GUARDED_BY(cs_main); /** When our tip was last updated. */ std::atomic m_last_tip_update{0s}; @@ -1217,34 +1224,55 @@ std::chrono::microseconds PeerManagerImpl::NextInvToInbounds(std::chrono::micros bool PeerManagerImpl::IsBlockRequested(const uint256& hash) { - return mapBlocksInFlight.find(hash) != mapBlocksInFlight.end(); + return mapBlocksInFlight.count(hash); } -void PeerManagerImpl::RemoveBlockRequest(const uint256& hash) +bool PeerManagerImpl::IsBlockRequestedFromOutbound(const uint256& hash) { - auto it = mapBlocksInFlight.find(hash); - if (it == mapBlocksInFlight.end()) { - // Block was not requested - return; + for (auto range = mapBlocksInFlight.equal_range(hash); range.first != range.second; range.first++) { + auto [nodeid, block_it] = range.first->second; + CNodeState& nodestate = *Assert(State(nodeid)); + if (!nodestate.m_is_inbound) return true; } - auto [node_id, list_it] = it->second; - CNodeState *state = State(node_id); - assert(state != nullptr); + return false; +} - if (state->vBlocksInFlight.begin() == list_it) { - // First block on the queue was received, update the start download time for the next one - state->m_downloading_since = std::max(state->m_downloading_since, GetTime()); +void PeerManagerImpl::RemoveBlockRequest(const uint256& hash, std::optional from_peer) +{ + auto range = mapBlocksInFlight.equal_range(hash); + if (range.first == range.second) { + // Block was not requested from any peer + return; } - state->vBlocksInFlight.erase(list_it); - state->nBlocksInFlight--; - if (state->nBlocksInFlight == 0) { - // Last validated block on the queue was received. - m_peers_downloading_from--; + // We should not have requested too many of this block + Assume(mapBlocksInFlight.count(hash) <= MAX_CMPCTBLOCKS_INFLIGHT_PER_BLOCK); + + while (range.first != range.second) { + auto [node_id, list_it] = range.first->second; + + if (from_peer && *from_peer != node_id) { + range.first++; + continue; + } + + CNodeState& state = *Assert(State(node_id)); + + if (state.vBlocksInFlight.begin() == list_it) { + // First block on the queue was received, update the start download time for the next one + state.m_downloading_since = std::max(state.m_downloading_since, GetTime()); + } + state.vBlocksInFlight.erase(list_it); + + if (state.vBlocksInFlight.empty()) { + // Last validated block on the queue for this peer was received. + m_peers_downloading_from--; + } + state.m_stalling_since = 0us; + + range.first = mapBlocksInFlight.erase(range.first); } - state->m_stalling_since = 0us; - mapBlocksInFlight.erase(it); } bool PeerManagerImpl::BlockRequested(NodeId nodeid, const CBlockIndex& block, std::list::iterator **pit) @@ -1254,27 +1282,29 @@ bool PeerManagerImpl::BlockRequested(NodeId nodeid, const CBlockIndex& block, st CNodeState *state = State(nodeid); assert(state != nullptr); + Assume(mapBlocksInFlight.count(hash) <= MAX_CMPCTBLOCKS_INFLIGHT_PER_BLOCK); + // Short-circuit most stuff in case it is from the same node - std::map::iterator> >::iterator itInFlight = mapBlocksInFlight.find(hash); - if (itInFlight != mapBlocksInFlight.end() && itInFlight->second.first == nodeid) { - if (pit) { - *pit = &itInFlight->second.second; + for (auto range = mapBlocksInFlight.equal_range(hash); range.first != range.second; range.first++) { + if (range.first->second.first == nodeid) { + if (pit) { + *pit = &range.first->second.second; + } + return false; } - return false; } - // Make sure it's not listed somewhere already. - RemoveBlockRequest(hash); + // Make sure it's not being fetched already from same peer. + RemoveBlockRequest(hash, nodeid); std::list::iterator it = state->vBlocksInFlight.insert(state->vBlocksInFlight.end(), {&block, std::unique_ptr(pit ? new PartiallyDownloadedBlock(&m_mempool) : nullptr)}); - state->nBlocksInFlight++; - if (state->nBlocksInFlight == 1) { + if (state->vBlocksInFlight.size() == 1) { // We're starting a block download (batch) from this peer. state->m_downloading_since = GetTime(); m_peers_downloading_from++; } - itInFlight = mapBlocksInFlight.insert(std::make_pair(hash, std::make_pair(nodeid, it))).first; + auto itInFlight = mapBlocksInFlight.insert(std::make_pair(hash, std::make_pair(nodeid, it))); if (pit) { *pit = &itInFlight->second.second; } @@ -1475,7 +1505,7 @@ void PeerManagerImpl::FindNextBlocksToDownload(const Peer& peer, unsigned int co } } else if (waitingfor == -1) { // This is the first already-in-flight block. - waitingfor = mapBlocksInFlight[pindex->GetBlockHash()].first; + waitingfor = mapBlocksInFlight.lower_bound(pindex->GetBlockHash())->second.first; } } } @@ -1755,12 +1785,20 @@ void PeerManagerImpl::FinalizeNode(const CNode& node) { nSyncStarted--; for (const QueuedBlock& entry : state->vBlocksInFlight) { - mapBlocksInFlight.erase(entry.pindex->GetBlockHash()); + auto range = mapBlocksInFlight.equal_range(entry.pindex->GetBlockHash()); + while (range.first != range.second) { + auto [node_id, list_it] = range.first->second; + if (node_id != nodeid) { + range.first++; + } else { + range.first = mapBlocksInFlight.erase(range.first); + } + } } m_orphanage.EraseForPeer(nodeid); if (m_txreconciliation) m_txreconciliation->ForgetPeer(nodeid); m_num_preferred_download_peers -= state->fPreferredDownload; - m_peers_downloading_from -= (state->nBlocksInFlight != 0); + m_peers_downloading_from -= (!state->vBlocksInFlight.empty()); assert(m_peers_downloading_from >= 0); m_outbound_peers_with_protect_from_disconnect -= state->m_chain_sync.m_protect; assert(m_outbound_peers_with_protect_from_disconnect >= 0); @@ -1998,11 +2036,11 @@ std::optional PeerManagerImpl::FetchBlock(NodeId peer_id, const CBl if (peer == nullptr) return "Peer does not exist"; LOCK(cs_main); - // Mark block as in-flight unless it already is (for this peer). - // If the peer does not send us a block, vBlocksInFlight remains non-empty, - // causing us to timeout and disconnect. - // If a block was already in-flight for a different peer, its BLOCKTXN - // response will be dropped. + + // Forget about all prior requests + RemoveBlockRequest(block_index.GetBlockHash(), std::nullopt); + + // Mark block as in-flight if (!BlockRequested(peer_id, block_index)) return "Already requested from this peer"; // Construct message to request the block @@ -3136,7 +3174,7 @@ void PeerManagerImpl::HeadersDirectFetchBlocks(CNode& pfrom, const Peer& peer, c std::vector vGetData; // Download as much as possible, from earliest to latest. for (const CBlockIndex *pindex : vToFetch | std::views::reverse) { - if (nodestate->nBlocksInFlight >= MAX_BLOCKS_IN_TRANSIT_PER_PEER) { + if (nodestate->vBlocksInFlight.size() >= MAX_BLOCKS_IN_TRANSIT_PER_PEER) { // Can't download any more from this peer break; } @@ -4829,15 +4867,27 @@ void PeerManagerImpl::ProcessMessage( nodestate->m_last_block_announcement = GetTime(); } - std::map::iterator> >::iterator blockInFlightIt = mapBlocksInFlight.find(pindex->GetBlockHash()); - bool fAlreadyInFlight = blockInFlightIt != mapBlocksInFlight.end(); - if (pindex->nStatus & BLOCK_HAVE_DATA) // Nothing to do here return; + auto range_flight = mapBlocksInFlight.equal_range(pindex->GetBlockHash()); + size_t already_in_flight = std::distance(range_flight.first, range_flight.second); + bool requested_block_from_this_peer{false}; + + // Multimap ensures ordering of outstanding requests. It's either empty or first in line. + bool first_in_flight = already_in_flight == 0 || (range_flight.first->second.first == pfrom.GetId()); + + while (range_flight.first != range_flight.second) { + if (range_flight.first->second.first == pfrom.GetId()) { + requested_block_from_this_peer = true; + break; + } + range_flight.first++; + } + if (pindex->nChainWork <= m_chainman.ActiveChain().Tip()->nChainWork || // We know something better pindex->nTx != 0) { // We had this block at some point, but pruned it - if (fAlreadyInFlight) { + if (requested_block_from_this_peer) { // We requested this block for some reason, but our mempool will probably be useless // so we just grab the block via normal getdata std::vector vInv(1); @@ -4848,14 +4898,15 @@ void PeerManagerImpl::ProcessMessage( } // If we're not close to tip yet, give up and let parallel block fetch work its magic - if (!fAlreadyInFlight && !CanDirectFetch()) + if (!already_in_flight && !CanDirectFetch()) { return; + } // We want to be a bit conservative just to be extra careful about DoS // possibilities in compact block processing... if (pindex->nHeight <= m_chainman.ActiveChain().Height() + 2) { - if ((!fAlreadyInFlight && nodestate->nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) || - (fAlreadyInFlight && blockInFlightIt->second.first == pfrom.GetId())) { + if ((already_in_flight < MAX_CMPCTBLOCKS_INFLIGHT_PER_BLOCK && nodestate->vBlocksInFlight.size() < MAX_BLOCKS_IN_TRANSIT_PER_PEER) || + requested_block_from_this_peer) { std::list::iterator *queuedBlockIt = nullptr; if (!BlockRequested(pfrom.GetId(), *pindex, &queuedBlockIt)) { if (!(*queuedBlockIt)->partialBlock) @@ -4870,15 +4921,20 @@ void PeerManagerImpl::ProcessMessage( PartiallyDownloadedBlock& partialBlock = *(*queuedBlockIt)->partialBlock; ReadStatus status = partialBlock.InitData(cmpctblock, vExtraTxnForCompact); if (status == READ_STATUS_INVALID) { - RemoveBlockRequest(pindex->GetBlockHash()); // Reset in-flight state in case Misbehaving does not result in a disconnect + RemoveBlockRequest(pindex->GetBlockHash(), pfrom.GetId()); // Reset in-flight state in case Misbehaving does not result in a disconnect Misbehaving(pfrom.GetId(), 100, "invalid compact block"); return; } else if (status == READ_STATUS_FAILED) { - // Duplicate txindexes, the block is now in-flight, so just request it - std::vector vInv(1); - vInv[0] = CInv(MSG_BLOCK, blockhash); - m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETDATA, vInv)); - return; + if (first_in_flight) { + // Duplicate txindexes, the block is now in-flight, so just request it + std::vector vInv(1); + vInv[0] = CInv(MSG_BLOCK, blockhash); + m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETDATA, vInv)); + return; + } else { + // Give up for this peer and wait for other peer(s) + RemoveBlockRequest(pindex->GetBlockHash(), pfrom.GetId()); + } } BlockTransactionsRequest req; @@ -4892,9 +4948,24 @@ void PeerManagerImpl::ProcessMessage( txn.blockhash = blockhash; blockTxnMsg << txn; fProcessBLOCKTXN = true; - } else { + } else if (first_in_flight) { + // We will try to round-trip any compact blocks we get on failure, + // as long as it's first... req.blockhash = pindex->GetBlockHash(); m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETBLOCKTXN, req)); + } else if (pfrom.m_bip152_highbandwidth_to && + (!pfrom.IsInboundConn() || + IsBlockRequestedFromOutbound(blockhash) || + already_in_flight < MAX_CMPCTBLOCKS_INFLIGHT_PER_BLOCK - 1)) { + // ... or it's a hb relay peer and: + // - peer is outbound, or + // - we already have an outbound attempt in flight(so we'll take what we can get), or + // - it's not the final parallel download slot (which we may reserve for first outbound) + req.blockhash = pindex->GetBlockHash(); + m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETBLOCKTXN, req)); + } else { + // Give up for this peer and wait for other peer(s) + RemoveBlockRequest(pindex->GetBlockHash(), pfrom.GetId()); } } else { // This block is either already in flight from a different @@ -4915,7 +4986,7 @@ void PeerManagerImpl::ProcessMessage( } } } else { - if (fAlreadyInFlight) { + if (requested_block_from_this_peer) { // We requested this block, but its far into the future, so our // mempool will probably be useless - request the block normally std::vector vInv(1); @@ -4964,7 +5035,7 @@ void PeerManagerImpl::ProcessMessage( // process from some other peer. We do this after calling // ProcessNewBlock so that a malleated cmpctblock announcement // can't be used to interfere with block relay. - RemoveBlockRequest(pblock->GetHash()); + RemoveBlockRequest(pblock->GetHash(), std::nullopt); } } return; @@ -4986,24 +5057,44 @@ void PeerManagerImpl::ProcessMessage( { LOCK(cs_main); - std::map::iterator> >::iterator it = mapBlocksInFlight.find(resp.blockhash); - if (it == mapBlocksInFlight.end() || !it->second.second->partialBlock || - it->second.first != pfrom.GetId()) { + auto range_flight = mapBlocksInFlight.equal_range(resp.blockhash); + size_t already_in_flight = std::distance(range_flight.first, range_flight.second); + bool requested_block_from_this_peer{false}; + + // Multimap ensures ordering of outstanding requests. It's either empty or first in line. + bool first_in_flight = already_in_flight == 0 || (range_flight.first->second.first == pfrom.GetId()); + + while (range_flight.first != range_flight.second) { + auto [node_id, block_it] = range_flight.first->second; + if (node_id == pfrom.GetId() && block_it->partialBlock) { + requested_block_from_this_peer = true; + break; + } + range_flight.first++; + } + + if (!requested_block_from_this_peer) { LogPrint(BCLog::NET, "Peer %d sent us block transactions for block we weren't expecting\n", pfrom.GetId()); return; } - PartiallyDownloadedBlock& partialBlock = *it->second.second->partialBlock; + PartiallyDownloadedBlock& partialBlock = *range_flight.first->second.second->partialBlock; ReadStatus status = partialBlock.FillBlock(*pblock, resp.txn); if (status == READ_STATUS_INVALID) { - RemoveBlockRequest(resp.blockhash); // Reset in-flight state in case Misbehaving does not result in a disconnect + RemoveBlockRequest(resp.blockhash, pfrom.GetId()); // Reset in-flight state in case Misbehaving does not result in a disconnect Misbehaving(pfrom.GetId(), 100, "invalid compact block/non-matching block transactions"); return; } else if (status == READ_STATUS_FAILED) { - // Might have collided, fall back to getdata now :( - std::vector invs; - invs.push_back(CInv(MSG_BLOCK, resp.blockhash)); - m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETDATA, invs)); + if (first_in_flight) { + // Might have collided, fall back to getdata now :( + std::vector invs; + invs.push_back(CInv(MSG_BLOCK, resp.blockhash)); + m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETDATA, invs)); + } else { + RemoveBlockRequest(resp.blockhash, pfrom.GetId()); + LogPrint(BCLog::NET, "Peer %d sent us a compact block but it failed to reconstruct, waiting on first download to complete\n", pfrom.GetId()); + return; + } } else { // Block is either okay, or possibly we received // READ_STATUS_CHECKBLOCK_FAILED. @@ -5022,7 +5113,7 @@ void PeerManagerImpl::ProcessMessage( // though the block was successfully read, and rely on the // handling in ProcessNewBlock to ensure the block index is // updated, etc. - RemoveBlockRequest(resp.blockhash); // it is now an empty pointer + RemoveBlockRequest(resp.blockhash, std::nullopt); // it is now an empty pointer fBlockRead = true; // mapBlockSource is used for potentially punishing peers and // updating which peers send us compact blocks, so the race @@ -5104,7 +5195,7 @@ void PeerManagerImpl::ProcessMessage( // Always process the block if we requested it, since we may // need it even when it's not a candidate for a new best tip. forceProcessing = IsBlockRequested(hash); - RemoveBlockRequest(hash); + RemoveBlockRequest(hash, std::nullopt); // mapBlockSource is only used for punishing peers and setting // which peers send us compact blocks, so the race between here and // cs_main in ProcessNewBlock is fine. @@ -5673,14 +5764,14 @@ void PeerManagerImpl::EvictExtraOutboundPeers(std::chrono::seconds now) // valid headers chain with at least as much work as our tip. CNodeState *node_state = State(pnode->GetId()); if (node_state == nullptr || - (now - pnode->m_connected >= MINIMUM_CONNECT_TIME && node_state->nBlocksInFlight == 0)) { + (now - pnode->m_connected >= MINIMUM_CONNECT_TIME && node_state->vBlocksInFlight.empty())) { pnode->fDisconnect = true; LogPrint(BCLog::NET, "disconnecting extra block-relay-only peer=%d (last block received at time %d)\n", pnode->GetId(), count_seconds(pnode->m_last_block_time)); return true; } else { LogPrint(BCLog::NET, "keeping block-relay-only peer=%d chosen for eviction (connect time: %d, blocks_in_flight: %d)\n", - pnode->GetId(), count_seconds(pnode->m_connected), node_state->nBlocksInFlight); + pnode->GetId(), count_seconds(pnode->m_connected), node_state->vBlocksInFlight.size()); } return false; }); @@ -5731,13 +5822,13 @@ void PeerManagerImpl::EvictExtraOutboundPeers(std::chrono::seconds now) // Also don't disconnect any peer we're trying to download a // block from. CNodeState &state = *State(pnode->GetId()); - if (now - pnode->m_connected > MINIMUM_CONNECT_TIME && state.nBlocksInFlight == 0) { + if (now - pnode->m_connected > MINIMUM_CONNECT_TIME && state.vBlocksInFlight.empty()) { LogPrint(BCLog::NET, "disconnecting extra outbound peer=%d (last block announcement received at time %d)\n", pnode->GetId(), oldest_block_announcement); pnode->fDisconnect = true; return true; } else { LogPrint(BCLog::NET, "keeping outbound peer=%d chosen for eviction (connect time: %d, blocks_in_flight: %d)\n", - pnode->GetId(), count_seconds(pnode->m_connected), state.nBlocksInFlight); + pnode->GetId(), count_seconds(pnode->m_connected), state.vBlocksInFlight.size()); return false; } }); @@ -6444,17 +6535,17 @@ bool PeerManagerImpl::SendMessages(CNode* pto) // Message: getdata (blocks) // std::vector vGetData; - if (CanServeBlocks(*peer) && pto->CanRelay() && ((sync_blocks_and_headers_from_peer && !IsLimitedPeer(*peer)) || !m_chainman.ActiveChainstate().IsInitialBlockDownload()) && state.nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) { + if (CanServeBlocks(*peer) && pto->CanRelay() && ((sync_blocks_and_headers_from_peer && !IsLimitedPeer(*peer)) || !m_chainman.ActiveChainstate().IsInitialBlockDownload()) && state.vBlocksInFlight.size() < MAX_BLOCKS_IN_TRANSIT_PER_PEER) { std::vector vToDownload; NodeId staller = -1; - FindNextBlocksToDownload(*peer, MAX_BLOCKS_IN_TRANSIT_PER_PEER - state.nBlocksInFlight, vToDownload, staller); + FindNextBlocksToDownload(*peer, MAX_BLOCKS_IN_TRANSIT_PER_PEER - state.vBlocksInFlight.size(), vToDownload, staller); for (const CBlockIndex *pindex : vToDownload) { vGetData.push_back(CInv(MSG_BLOCK, pindex->GetBlockHash())); BlockRequested(pto->GetId(), *pindex); LogPrint(BCLog::NET, "Requesting block %s (%d) peer=%d\n", pindex->GetBlockHash().ToString(), pindex->nHeight, pto->GetId()); } - if (state.nBlocksInFlight == 0 && staller != -1) { + if (state.vBlocksInFlight.empty() && staller != -1) { if (State(staller)->m_stalling_since == 0us) { State(staller)->m_stalling_since = current_time; LogPrint(BCLog::NET, "Stall started peer=%d\n", staller); diff --git a/src/net_processing.h b/src/net_processing.h index 7dc68e272d53..7defef036dcb 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -40,6 +40,8 @@ static const bool DEFAULT_PEERBLOOMFILTERS = true; static const bool DEFAULT_PEERBLOCKFILTERS = false; /** Threshold for marking a node to be discouraged, e.g. disconnected and added to the discouragement filter. */ static const int DISCOURAGEMENT_THRESHOLD{100}; +/** Maximum number of outstanding CMPCTBLOCK requests for the same block. */ +static const unsigned int MAX_CMPCTBLOCKS_INFLIGHT_PER_BLOCK = 3; struct CNodeStateStats { int m_misbehavior_score = 0; diff --git a/src/rpc/blockchain.cpp b/src/rpc/blockchain.cpp index 8401e186c750..b63f240c0fd3 100644 --- a/src/rpc/blockchain.cpp +++ b/src/rpc/blockchain.cpp @@ -484,7 +484,7 @@ static RPCHelpMan getblockfrompeer() "getblockfrompeer", "Attempt to fetch block from a given peer.\n\n" "We must have the header for this block, e.g. using submitheader.\n" - "Subsequent calls for the same block and a new peer will cause the response from the previous peer to be ignored.\n" + "Subsequent calls for the same block may cause the response from the previous peer to be ignored.\n" "Peers generally ignore requests for a stale block that they never fully verified, or one that is more than a month old.\n" "When a peer does not respond with a block, we will disconnect.\n\n" "Returns an empty JSON object if the request was successfully scheduled.", diff --git a/test/functional/p2p_compactblocks.py b/test/functional/p2p_compactblocks.py index b0ee1e2a09d6..3200defde3c9 100755 --- a/test/functional/p2p_compactblocks.py +++ b/test/functional/p2p_compactblocks.py @@ -102,6 +102,10 @@ def clear_block_announcement(self): self.last_message.pop("headers", None) self.last_message.pop("cmpctblock", None) + def clear_getblocktxn(self): + with p2p_lock: + self.last_message.pop("getblocktxn", None) + def get_headers(self, locator, hashstop): msg = msg_getheaders() msg.locator.vHave = locator @@ -708,7 +712,7 @@ def request_cb_announcements(self, peer): peer.get_headers(locator=[int(tip, 16)], hashstop=0) peer.send_and_ping(msg_sendcmpct(announce=True, version=1)) - def test_compactblock_reconstruction_multiple_peers(self, stalling_peer, delivery_peer): + def test_compactblock_reconstruction_stalling_peer(self, stalling_peer, delivery_peer): node = self.nodes[0] assert len(self.utxos) @@ -784,12 +788,85 @@ def assert_highbandwidth_states(node, hb_to, hb_from): hb_test_node.send_and_ping(msg_sendcmpct(announce=False, version=1)) assert_highbandwidth_states(self.nodes[0], hb_to=True, hb_from=False) + def test_compactblock_reconstruction_parallel_reconstruction(self, stalling_peer, delivery_peer, inbound_peer, outbound_peer): + """ All p2p connections are inbound except outbound_peer. We test that ultimate parallel slot + can only be taken by an outbound node unless prior attempts were done by an outbound + """ + node = self.nodes[0] + assert len(self.utxos) + + def announce_cmpct_block(node, peer, txn_count): + utxo = self.utxos.pop(0) + block = self.build_block_with_transactions(node, utxo, txn_count) + + cmpct_block = HeaderAndShortIDs() + cmpct_block.initialize_from_block(block) + msg = msg_cmpctblock(cmpct_block.to_p2p()) + peer.send_and_ping(msg) + with p2p_lock: + assert "getblocktxn" in peer.last_message + return block, cmpct_block + + for name, peer in [("delivery", delivery_peer), ("inbound", inbound_peer), ("outbound", outbound_peer)]: + self.log.info(f"Setting {name} as high bandwidth peer") + block, cmpct_block = announce_cmpct_block(node, peer, 1) + msg = msg_blocktxn() + msg.block_transactions.blockhash = block.sha256 + msg.block_transactions.transactions = block.vtx[1:] + peer.send_and_ping(msg) + assert_equal(int(node.getbestblockhash(), 16), block.sha256) + peer.clear_getblocktxn() + + # Test the simple parallel download case... + for num_missing in [1, 5, 20]: + + # Remaining low-bandwidth peer is stalling_peer, who announces first + assert_equal([peer['bip152_hb_to'] for peer in node.getpeerinfo()], [False, True, True, True]) + + block, cmpct_block = announce_cmpct_block(node, stalling_peer, num_missing) + + delivery_peer.send_and_ping(msg_cmpctblock(cmpct_block.to_p2p())) + with p2p_lock: + # The second peer to announce should still get a getblocktxn + assert "getblocktxn" in delivery_peer.last_message + assert int(node.getbestblockhash(), 16) != block.sha256 + + inbound_peer.send_and_ping(msg_cmpctblock(cmpct_block.to_p2p())) + with p2p_lock: + # The third inbound peer to announce should *not* get a getblocktxn + assert "getblocktxn" not in inbound_peer.last_message + assert int(node.getbestblockhash(), 16) != block.sha256 + + outbound_peer.send_and_ping(msg_cmpctblock(cmpct_block.to_p2p())) + with p2p_lock: + # The third peer to announce should get a getblocktxn if outbound + assert "getblocktxn" in outbound_peer.last_message + assert int(node.getbestblockhash(), 16) != block.sha256 + + # Second peer completes the compact block first + msg = msg_blocktxn() + msg.block_transactions.blockhash = block.sha256 + msg.block_transactions.transactions = block.vtx[1:] + delivery_peer.send_and_ping(msg) + assert_equal(int(node.getbestblockhash(), 16), block.sha256) + + # Nothing bad should happen if we get a late fill from the first peer... + stalling_peer.send_and_ping(msg) + self.utxos.append([block.vtx[-1].sha256, 0, block.vtx[-1].vout[0].nValue]) + + delivery_peer.clear_getblocktxn() + inbound_peer.clear_getblocktxn() + outbound_peer.clear_getblocktxn() + + def run_test(self): self.wallet = MiniWallet(self.nodes[0]) # Setup the p2p connections self.test_node = self.nodes[0].add_p2p_connection(TestP2PConn()) self.additional_test_node = self.nodes[0].add_p2p_connection(TestP2PConn(), services=NODE_NETWORK | NODE_HEADERS_COMPRESSED) + self.onemore_inbound_node = self.nodes[0].add_p2p_connection(TestP2PConn()) + self.outbound_node = self.nodes[0].add_outbound_p2p_connection(TestP2PConn(), p2p_idx=3, connection_type="outbound-full-relay") # We will need UTXOs to construct transactions in later tests. self.make_utxos() @@ -797,6 +874,8 @@ def run_test(self): self.log.info("Testing SENDCMPCT p2p message... ") self.test_sendcmpct(self.test_node) self.test_sendcmpct(self.additional_test_node) + self.test_sendcmpct(self.onemore_inbound_node) + self.test_sendcmpct(self.outbound_node) self.log.info("Testing compactblock construction...") self.test_compactblock_construction(self.test_node) @@ -813,8 +892,11 @@ def run_test(self): self.log.info("Testing handling of incorrect blocktxn responses...") self.test_incorrect_blocktxn_response(self.test_node) - self.log.info("Testing reconstructing compact blocks from all peers...") - self.test_compactblock_reconstruction_multiple_peers(self.test_node, self.additional_test_node) + self.log.info("Testing reconstructing compact blocks with a stalling peer...") + self.test_compactblock_reconstruction_stalling_peer(self.test_node, self.additional_test_node) + + self.log.info("Testing reconstructing compact blocks from multiple peers...") + self.test_compactblock_reconstruction_parallel_reconstruction(stalling_peer=self.test_node, inbound_peer=self.onemore_inbound_node, delivery_peer=self.additional_test_node, outbound_peer=self.outbound_node) # End-to-end block relay tests self.log.info("Testing end-to-end block relay...")