Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,9 @@ void Shutdown(NodeContext& node)

StopTorControl();

if (node.chainman) node.chainman->InterruptConnectorThread();
if (node.background_init_thread.joinable()) node.background_init_thread.join();
if (node.chainman) node.chainman->JoinConnectorThread();
// After everything has been shut down, but before things get flushed, stop the
// the scheduler. After this point, SyncWithValidationInterfaceQueue() should not be called anymore
// as this would prevent the shutdown from completing.
Expand Down Expand Up @@ -2017,6 +2019,8 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
vImportFiles.push_back(fs::PathFromString(strFile));
}

chainman.StartConnectorThread();

node.background_init_thread = std::thread(&util::TraceThread, "initload", [=, &chainman, &args, &node] {
ScheduleBatchPriority();
// Import blocks and ActivateBestChain()
Expand Down
35 changes: 28 additions & 7 deletions src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ static const unsigned int MAX_INV_SZ = 50000;
static const unsigned int MAX_GETDATA_SZ = 1000;
/** Number of blocks that can be requested at any given time from a single peer. */
static const int MAX_BLOCKS_IN_TRANSIT_PER_PEER = 16;
/** Number of blocks that can be requested at any given time from a single peer during IBD. */
static const int MAX_BLOCKS_IN_TRANSIT_PER_PEER_IBD = 64;
/** Default time during which a peer must stall block download progress before being disconnected.
* the actual timeout is increased temporarily if peers are disconnected for hitting the timeout */
static constexpr auto BLOCK_STALLING_TIMEOUT_DEFAULT{2s};
Expand All @@ -144,6 +146,8 @@ static_assert(MAX_BLOCKTXN_DEPTH <= MIN_BLOCKS_TO_KEEP, "MAX_BLOCKTXN_DEPTH too
* degree of disordering of blocks on disk (which make reindexing and pruning harder). We'll probably
* want to make this a per-peer adaptive value at some point. */
static const unsigned int BLOCK_DOWNLOAD_WINDOW = 1024;
/** Block download window during IBD, when the connector thread processes blocks asynchronously. */
static const unsigned int BLOCK_DOWNLOAD_WINDOW_IBD = 8192;
/** Block download timeout base, expressed in multiples of the block interval (i.e. 10 min) */
static constexpr double BLOCK_DOWNLOAD_TIMEOUT_BASE = 1;
/** Additional block download timeout per parallel downloading peer (i.e. 5 min) */
Expand Down Expand Up @@ -1437,9 +1441,23 @@ void PeerManagerImpl::FindNextBlocksToDownload(const Peer& peer, unsigned int co
// Never fetch further than the best block we know the peer has, or more than BLOCK_DOWNLOAD_WINDOW + 1 beyond the last
// linked block we have in common with this peer. The +1 is so we can detect stalling, namely if we would be able to
// download that next block if the window were 1 larger.
int nWindowEnd = state->pindexLastCommonBlock->nHeight + BLOCK_DOWNLOAD_WINDOW;
const unsigned int download_window = m_chainman.IsInitialBlockDownload() ? BLOCK_DOWNLOAD_WINDOW_IBD : BLOCK_DOWNLOAD_WINDOW;
int nWindowEnd = state->pindexLastCommonBlock->nHeight + download_window;

FindNextBlocks(vBlocks, peer, state, pindexWalk, count, nWindowEnd, &m_chainman.ActiveChain(), &nodeStaller);

// pindexLastCommonBlock may have advanced during the walk (blocks stored
// but not yet connected to the active chain). Recompute the window and
// retry to avoid false stall detection and to request newly-visible blocks.
if (state->pindexLastCommonBlock->nHeight > pindexWalk->nHeight) {
int nNewWindowEnd = state->pindexLastCommonBlock->nHeight + download_window;
if (nNewWindowEnd > nWindowEnd) {
nodeStaller = -1;
FindNextBlocks(vBlocks, peer, state, state->pindexLastCommonBlock,
count - vBlocks.size(), nNewWindowEnd,
&m_chainman.ActiveChain(), &nodeStaller);
}
}
}

void PeerManagerImpl::TryDownloadingHistoricalBlocks(const Peer& peer, unsigned int count, std::vector<const CBlockIndex*>& vBlocks, const CBlockIndex *from_tip, const CBlockIndex* target_block)
Expand Down Expand Up @@ -1468,7 +1486,8 @@ void PeerManagerImpl::TryDownloadingHistoricalBlocks(const Peer& peer, unsigned
return;
}

FindNextBlocks(vBlocks, peer, state, from_tip, count, std::min<int>(from_tip->nHeight + BLOCK_DOWNLOAD_WINDOW, target_block->nHeight));
const unsigned int download_window = m_chainman.IsInitialBlockDownload() ? BLOCK_DOWNLOAD_WINDOW_IBD : BLOCK_DOWNLOAD_WINDOW;
FindNextBlocks(vBlocks, peer, state, from_tip, count, std::min<int>(from_tip->nHeight + download_window, target_block->nHeight));
}

void PeerManagerImpl::FindNextBlocks(std::vector<const CBlockIndex*>& vBlocks, const Peer& peer, CNodeState *state, const CBlockIndex *pindexWalk, unsigned int count, int nWindowEnd, const CChain* activeChain, NodeId* nodeStaller)
Expand Down Expand Up @@ -3427,7 +3446,8 @@ void PeerManagerImpl::ProcessGetCFCheckPt(CNode& node, Peer& peer, DataStream& v
void PeerManagerImpl::ProcessBlock(CNode& node, const std::shared_ptr<const CBlock>& block, bool force_processing, bool min_pow_checked)
{
bool new_block{false};
m_chainman.ProcessNewBlock(block, force_processing, min_pow_checked, &new_block);
const bool activate_chain = !m_chainman.IsInitialBlockDownload();
m_chainman.ProcessNewBlock(block, force_processing, min_pow_checked, &new_block, activate_chain);
if (new_block) {
node.m_last_block_time = GetTime<std::chrono::seconds>();
// In case this block came from a different peer than we requested
Expand Down Expand Up @@ -5151,7 +5171,7 @@ void PeerManagerImpl::ProcessMessage(Peer& peer, CNode& pfrom, const std::string
std::vector<CInv> vInv;
vRecv >> vInv;
std::vector<GenTxid> tx_invs;
if (vInv.size() <= node::MAX_PEER_TX_ANNOUNCEMENTS + MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
if (vInv.size() <= node::MAX_PEER_TX_ANNOUNCEMENTS + MAX_BLOCKS_IN_TRANSIT_PER_PEER_IBD) {
for (CInv &inv : vInv) {
if (inv.IsGenTxMsg()) {
tx_invs.emplace_back(ToGenTxid(inv));
Expand Down Expand Up @@ -6162,11 +6182,12 @@ bool PeerManagerImpl::SendMessages(CNode& node)
// Message: getdata (blocks)
//
std::vector<CInv> vGetData;
if (CanServeBlocks(peer) && ((sync_blocks_and_headers_from_peer && !IsLimitedPeer(peer)) || !m_chainman.IsInitialBlockDownload()) && state.vBlocksInFlight.size() < MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
const int max_blocks_in_transit = m_chainman.IsInitialBlockDownload() ? MAX_BLOCKS_IN_TRANSIT_PER_PEER_IBD : MAX_BLOCKS_IN_TRANSIT_PER_PEER;
if (CanServeBlocks(peer) && ((sync_blocks_and_headers_from_peer && !IsLimitedPeer(peer)) || !m_chainman.IsInitialBlockDownload()) && static_cast<int>(state.vBlocksInFlight.size()) < max_blocks_in_transit) {
std::vector<const CBlockIndex*> vToDownload;
NodeId staller = -1;
auto get_inflight_budget = [&state]() {
return std::max(0, MAX_BLOCKS_IN_TRANSIT_PER_PEER - static_cast<int>(state.vBlocksInFlight.size()));
auto get_inflight_budget = [&state, max_blocks_in_transit]() {
return std::max(0, max_blocks_in_transit - static_cast<int>(state.vBlocksInFlight.size()));
};

// If there are multiple chainstates, download blocks for the
Expand Down
49 changes: 42 additions & 7 deletions src/primitives/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <uint256.h>
#include <util/time.h>

#include <atomic>
#include <cstdint>
#include <string>
#include <utility>
Expand Down Expand Up @@ -76,10 +77,10 @@ class CBlock : public CBlockHeader
// network and disk
std::vector<CTransactionRef> vtx;

// Memory-only flags for caching expensive checks
mutable bool fChecked; // CheckBlock()
mutable bool m_checked_witness_commitment{false}; // CheckWitnessCommitment()
mutable bool m_checked_merkle_root{false}; // CheckMerkleRoot()
// Memory-only flags for caching expensive checks (atomic for thread safety)
mutable std::atomic<bool> fChecked{false}; // CheckBlock()
mutable std::atomic<bool> m_checked_witness_commitment{false}; // CheckWitnessCommitment()
mutable std::atomic<bool> m_checked_merkle_root{false}; // CheckMerkleRoot()

CBlock()
{
Expand All @@ -92,6 +93,40 @@ class CBlock : public CBlockHeader
*(static_cast<CBlockHeader*>(this)) = header;
}

CBlock(const CBlock& other) : CBlockHeader(other), vtx(other.vtx),
fChecked(other.fChecked.load(std::memory_order_relaxed)),
m_checked_witness_commitment(other.m_checked_witness_commitment.load(std::memory_order_relaxed)),
m_checked_merkle_root(other.m_checked_merkle_root.load(std::memory_order_relaxed)) {}

CBlock(CBlock&& other) noexcept : CBlockHeader(std::move(other)), vtx(std::move(other.vtx)),
fChecked(other.fChecked.load(std::memory_order_relaxed)),
m_checked_witness_commitment(other.m_checked_witness_commitment.load(std::memory_order_relaxed)),
m_checked_merkle_root(other.m_checked_merkle_root.load(std::memory_order_relaxed)) {}

CBlock& operator=(const CBlock& other)
{
if (this != &other) {
CBlockHeader::operator=(other);
vtx = other.vtx;
fChecked.store(other.fChecked.load(std::memory_order_relaxed), std::memory_order_relaxed);
m_checked_witness_commitment.store(other.m_checked_witness_commitment.load(std::memory_order_relaxed), std::memory_order_relaxed);
m_checked_merkle_root.store(other.m_checked_merkle_root.load(std::memory_order_relaxed), std::memory_order_relaxed);
}
return *this;
}

CBlock& operator=(CBlock&& other) noexcept
{
if (this != &other) {
CBlockHeader::operator=(std::move(other));
vtx = std::move(other.vtx);
fChecked.store(other.fChecked.load(std::memory_order_relaxed), std::memory_order_relaxed);
m_checked_witness_commitment.store(other.m_checked_witness_commitment.load(std::memory_order_relaxed), std::memory_order_relaxed);
m_checked_merkle_root.store(other.m_checked_merkle_root.load(std::memory_order_relaxed), std::memory_order_relaxed);
}
return *this;
}

SERIALIZE_METHODS(CBlock, obj)
{
READWRITE(AsBase<CBlockHeader>(obj), obj.vtx);
Expand All @@ -101,9 +136,9 @@ class CBlock : public CBlockHeader
{
CBlockHeader::SetNull();
vtx.clear();
fChecked = false;
m_checked_witness_commitment = false;
m_checked_merkle_root = false;
fChecked.store(false, std::memory_order_relaxed);
m_checked_witness_commitment.store(false, std::memory_order_relaxed);
m_checked_merkle_root.store(false, std::memory_order_relaxed);
}

std::string ToString() const;
Expand Down
Loading