Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions src/bench/checkqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,13 @@ static void CCheckQueueSpeedPrevectorJob(benchmark::Bench& bench)

struct PrevectorJob {
prevector<PREVECTOR_SIZE, uint8_t> p;
PrevectorJob() = default;
explicit PrevectorJob(FastRandomContext& insecure_rand){
p.resize(insecure_rand.randrange(PREVECTOR_SIZE*2));
}
bool operator()()
{
return true;
}
void swap(PrevectorJob& x) noexcept
{
p.swap(x.p);
};
};
CCheckQueue<PrevectorJob> queue {QUEUE_BATCH_SIZE};

Expand All @@ -61,7 +56,7 @@ static void CCheckQueueSpeedPrevectorJob(benchmark::Bench& bench)
// Make insecure_rand here so that each iteration is identical.
CCheckQueueControl<PrevectorJob> control(&queue);
for (auto vChecks : vBatches) {
control.Add(vChecks);
control.Add(std::move(vChecks));
}
// control waits for completion by RAII, but
// it is done explicitly here for clarity
Expand Down
25 changes: 10 additions & 15 deletions src/checkqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <util/threadnames.h>

#include <algorithm>
#include <iterator>
#include <vector>

template <typename T>
Expand Down Expand Up @@ -110,13 +111,9 @@ class CCheckQueue
// * Try to account for idle jobs which will instantly start helping.
// * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
vChecks.resize(nNow);
for (unsigned int i = 0; i < nNow; i++) {
// We want the lock on the m_mutex to be as short as possible, so swap jobs from the global
// queue to the local batch vector instead of copying.
vChecks[i].swap(queue.back());
queue.pop_back();
}
auto start_it = queue.end() - nNow;
vChecks.assign(std::make_move_iterator(start_it), std::make_move_iterator(queue.end()));
queue.erase(start_it, queue.end());
// Check whether we need to do work at all
fOk = fAllOk;
}
Expand Down Expand Up @@ -163,18 +160,15 @@ class CCheckQueue
}

//! Add a batch of checks to the queue
void Add(std::vector<T>& vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
void Add(std::vector<T>&& vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
if (vChecks.empty()) {
return;
}

{
LOCK(m_mutex);
for (T& check : vChecks) {
queue.emplace_back();
check.swap(queue.back());
}
queue.insert(queue.end(), std::make_move_iterator(vChecks.begin()), std::make_move_iterator(vChecks.end()));
nTodo += vChecks.size();
}

Expand Down Expand Up @@ -236,10 +230,11 @@ class CCheckQueueControl
return fRet;
}

void Add(std::vector<T>& vChecks)
void Add(std::vector<T>&& vChecks)
{
if (pqueue != nullptr)
pqueue->Add(vChecks);
if (pqueue != nullptr) {
pqueue->Add(std::move(vChecks));
}
}

~CCheckQueueControl()
Expand Down
3 changes: 2 additions & 1 deletion src/rpc/blockchain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,8 @@ static RPCHelpMan getblockfrompeer()
"We must have the header for this block, e.g. using submitheader.\n"
"Subsequent calls for the same block and a new peer will cause the response from the previous peer to be ignored.\n"
"Peers generally ignore requests for a stale block that they never fully verified, or one that is more than a month old.\n"
"When a peer does not respond with a block, we will disconnect.\n\n"
"When a peer does not respond with a block, we will disconnect.\n"
"Note: The block could be re-pruned as soon as it is received.\n\n"
"Returns an empty JSON object if the request was successfully scheduled.",
{
{"blockhash", RPCArg::Type::STR_HEX, RPCArg::Optional::NO, "The block hash to try to fetch"},
Expand Down
58 changes: 21 additions & 37 deletions src/test/checkqueue_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ struct FakeCheck {
{
return true;
}
void swap(FakeCheck& x) noexcept {};
};

struct FakeCheckCheckCompletion {
Expand All @@ -52,39 +51,28 @@ struct FakeCheckCheckCompletion {
n_calls.fetch_add(1, std::memory_order_relaxed);
return true;
}
void swap(FakeCheckCheckCompletion& x) noexcept {};
};

struct FailingCheck {
bool fails;
FailingCheck(bool _fails) : fails(_fails){};
FailingCheck() : fails(true){};
bool operator()() const
{
return !fails;
}
void swap(FailingCheck& x) noexcept
{
std::swap(fails, x.fails);
};
};

struct UniqueCheck {
static Mutex m;
static std::unordered_multiset<size_t> results GUARDED_BY(m);
size_t check_id;
UniqueCheck(size_t check_id_in) : check_id(check_id_in){};
UniqueCheck() : check_id(0){};
bool operator()()
{
LOCK(m);
results.insert(check_id);
return true;
}
void swap(UniqueCheck& x) noexcept
{
std::swap(x.check_id, check_id);
};
};


Expand All @@ -95,7 +83,6 @@ struct MemoryCheck {
{
return true;
}
MemoryCheck() = default;
MemoryCheck(const MemoryCheck& x)
{
// We have to do this to make sure that destructor calls are paired
Expand All @@ -112,19 +99,13 @@ struct MemoryCheck {
{
fake_allocated_memory.fetch_sub(b, std::memory_order_relaxed);
};
void swap(MemoryCheck& x) noexcept
{
std::swap(b, x.b);
};
};

struct FrozenCleanupCheck {
static std::atomic<uint64_t> nFrozen;
static std::condition_variable cv;
static std::mutex m;
// Freezing can't be the default initialized behavior given how the queue
// swaps in default initialized Checks.
bool should_freeze {false};
bool should_freeze{true};
bool operator()() const
{
return true;
Expand All @@ -139,10 +120,17 @@ struct FrozenCleanupCheck {
cv.wait(l, []{ return nFrozen.load(std::memory_order_relaxed) == 0;});
}
}
void swap(FrozenCleanupCheck& x) noexcept
FrozenCleanupCheck(FrozenCleanupCheck&& other) noexcept
{
std::swap(should_freeze, x.should_freeze);
};
should_freeze = other.should_freeze;
other.should_freeze = false;
}
FrozenCleanupCheck& operator=(FrozenCleanupCheck&& other) noexcept
{
should_freeze = other.should_freeze;
other.should_freeze = false;
return *this;
}
};

// Static Allocations
Expand Down Expand Up @@ -172,19 +160,19 @@ static void Correct_Queue_range(std::vector<size_t> range)
small_queue->StartWorkerThreads(SCRIPT_CHECK_THREADS);
// Make vChecks here to save on malloc (this test can be slow...)
std::vector<FakeCheckCheckCompletion> vChecks;
vChecks.reserve(9);
for (const size_t i : range) {
size_t total = i;
FakeCheckCheckCompletion::n_calls = 0;
CCheckQueueControl<FakeCheckCheckCompletion> control(small_queue.get());
while (total) {
vChecks.resize(std::min(total, (size_t) InsecureRandRange(10)));
vChecks.clear();
vChecks.resize(std::min<size_t>(total, InsecureRandRange(10)));
total -= vChecks.size();
control.Add(vChecks);
control.Add(std::move(vChecks));
}
BOOST_REQUIRE(control.Wait());
if (FakeCheckCheckCompletion::n_calls != i) {
BOOST_REQUIRE_EQUAL(FakeCheckCheckCompletion::n_calls, i);
}
BOOST_REQUIRE_EQUAL(FakeCheckCheckCompletion::n_calls, i);
}
small_queue->StopWorkerThreads();
}
Expand Down Expand Up @@ -241,7 +229,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Catches_Failure)
vChecks.reserve(r);
for (size_t k = 0; k < r && remaining; k++, remaining--)
vChecks.emplace_back(remaining == 1);
control.Add(vChecks);
control.Add(std::move(vChecks));
}
bool success = control.Wait();
if (i > 0) {
Expand All @@ -266,7 +254,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Recovers_From_Failure)
std::vector<FailingCheck> vChecks;
vChecks.resize(100, false);
vChecks[99] = end_fails;
control.Add(vChecks);
control.Add(std::move(vChecks));
}
bool r =control.Wait();
BOOST_REQUIRE(r != end_fails);
Expand All @@ -292,7 +280,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck)
std::vector<UniqueCheck> vChecks;
for (size_t k = 0; k < r && total; k++)
vChecks.emplace_back(--total);
control.Add(vChecks);
control.Add(std::move(vChecks));
}
}
{
Expand Down Expand Up @@ -330,7 +318,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Memory)
// to catch any sort of deallocation failure
vChecks.emplace_back(total == 0 || total == i || total == i/2);
}
control.Add(vChecks);
control.Add(std::move(vChecks));
}
}
BOOST_REQUIRE_EQUAL(MemoryCheck::fake_allocated_memory, 0U);
Expand All @@ -348,11 +336,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_FrozenCleanup)
std::thread t0([&]() {
CCheckQueueControl<FrozenCleanupCheck> control(queue.get());
std::vector<FrozenCleanupCheck> vChecks(1);
// Freezing can't be the default initialized behavior given how the queue
// swaps in default initialized Checks (otherwise freezing destructor
// would get called twice).
vChecks[0].should_freeze = true;
control.Add(vChecks);
control.Add(std::move(vChecks));
bool waitResult = control.Wait(); // Hangs here
assert(waitResult);
});
Expand Down
12 changes: 3 additions & 9 deletions src/test/fuzz/checkqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@

namespace {
struct DumbCheck {
const bool result = false;

DumbCheck() = default;
bool result = false;

explicit DumbCheck(const bool _result) : result(_result)
{
Expand All @@ -25,10 +23,6 @@ struct DumbCheck {
{
return result;
}

void swap(DumbCheck& x) noexcept
{
}
};
} // namespace

Expand All @@ -48,15 +42,15 @@ FUZZ_TARGET(checkqueue)
checks_2.emplace_back(result);
}
if (fuzzed_data_provider.ConsumeBool()) {
check_queue_1.Add(checks_1);
check_queue_1.Add(std::move(checks_1));
}
if (fuzzed_data_provider.ConsumeBool()) {
(void)check_queue_1.Wait();
}

CCheckQueueControl<DumbCheck> check_queue_control{&check_queue_2};
if (fuzzed_data_provider.ConsumeBool()) {
check_queue_control.Add(checks_2);
check_queue_control.Add(std::move(checks_2));
}
if (fuzzed_data_provider.ConsumeBool()) {
(void)check_queue_control.Wait();
Expand Down
5 changes: 2 additions & 3 deletions src/validation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1907,8 +1907,7 @@ bool CheckInputScripts(const CTransaction& tx, TxValidationState& state,
// Verify signature
CScriptCheck check(coin.out, tx, i, flags, cacheSigStore, &txdata);
if (pvChecks) {
pvChecks->push_back(CScriptCheck());
check.swap(pvChecks->back());
pvChecks->emplace_back(std::move(check));
} else if (!check()) {
const bool hasNonMandatoryFlags = (flags & STANDARD_NOT_MANDATORY_VERIFY_FLAGS) != 0;

Expand Down Expand Up @@ -2422,7 +2421,7 @@ bool CChainState::ConnectBlock(const CBlock& block, BlockValidationState& state,
tx.GetHash().ToString(), state.ToString());
return false;
}
control.Add(vChecks);
control.Add(std::move(vChecks));
}

CTxUndo undoDummy;
Expand Down
21 changes: 7 additions & 14 deletions src/validation.h
Original file line number Diff line number Diff line change
Expand Up @@ -336,26 +336,19 @@ class CScriptCheck
unsigned int nIn;
unsigned int nFlags;
bool cacheStore;
ScriptError error;
ScriptError error{SCRIPT_ERR_UNKNOWN_ERROR};
PrecomputedTransactionData *txdata;

public:
CScriptCheck(): ptxTo(nullptr), nIn(0), nFlags(0), cacheStore(false), error(SCRIPT_ERR_UNKNOWN_ERROR) {}
CScriptCheck(const CTxOut& outIn, const CTransaction& txToIn, unsigned int nInIn, unsigned int nFlagsIn, bool cacheIn, PrecomputedTransactionData* txdataIn) :
m_tx_out(outIn), ptxTo(&txToIn), nIn(nInIn), nFlags(nFlagsIn), cacheStore(cacheIn), error(SCRIPT_ERR_UNKNOWN_ERROR), txdata(txdataIn) { }
m_tx_out(outIn), ptxTo(&txToIn), nIn(nInIn), nFlags(nFlagsIn), cacheStore(cacheIn), txdata(txdataIn) { }

bool operator()();
CScriptCheck(const CScriptCheck&) = delete;
CScriptCheck& operator=(const CScriptCheck&) = delete;
CScriptCheck(CScriptCheck&&) = default;
CScriptCheck& operator=(CScriptCheck&&) = default;

void swap(CScriptCheck& check) noexcept
{
std::swap(ptxTo, check.ptxTo);
std::swap(m_tx_out, check.m_tx_out);
std::swap(nIn, check.nIn);
std::swap(nFlags, check.nFlags);
std::swap(cacheStore, check.cacheStore);
std::swap(error, check.error);
std::swap(txdata, check.txdata);
}
bool operator()();

ScriptError GetScriptError() const { return error; }
};
Expand Down
Loading
Loading