Second thread pool for IO

This commit is contained in:
SChernykh 2022-06-14 21:23:23 +02:00
parent 7cbae6ca98
commit 6adf03cdc5
8 changed files with 17 additions and 13 deletions

View File

@ -521,7 +521,7 @@ bool load_txt_records_from_dns(std::vector<std::string> &good_records, const std
// send all requests in parallel // send all requests in parallel
std::deque<bool> avail(dns_urls.size(), false), valid(dns_urls.size(), false); std::deque<bool> avail(dns_urls.size(), false), valid(dns_urls.size(), false);
tools::threadpool& tpool = tools::threadpool::getInstance(); tools::threadpool& tpool = tools::threadpool::getInstanceForIO();
tools::threadpool::waiter waiter(tpool); tools::threadpool::waiter waiter(tpool);
for (size_t n = 0; n < dns_urls.size(); ++n) for (size_t n = 0; n < dns_urls.size(); ++n)
{ {

View File

@ -42,10 +42,14 @@ namespace tools
class threadpool class threadpool
{ {
public: public:
static threadpool& getInstance() { static threadpool& getInstanceForCompute() {
static threadpool instance; static threadpool instance;
return instance; return instance;
} }
static threadpool& getInstanceForIO() {
static threadpool instance(8);
return instance;
}
static threadpool *getNewForUnitTests(unsigned max_threads = 0) { static threadpool *getNewForUnitTests(unsigned max_threads = 0) {
return new threadpool(max_threads); return new threadpool(max_threads);
} }

View File

@ -3434,7 +3434,7 @@ bool Blockchain::check_tx_inputs(transaction& tx, tx_verification_context &tvc,
std::vector < uint64_t > results; std::vector < uint64_t > results;
results.resize(tx.vin.size(), 0); results.resize(tx.vin.size(), 0);
tools::threadpool& tpool = tools::threadpool::getInstance(); tools::threadpool& tpool = tools::threadpool::getInstanceForCompute();
tools::threadpool::waiter waiter(tpool); tools::threadpool::waiter waiter(tpool);
int threads = tpool.get_max_concurrency(); int threads = tpool.get_max_concurrency();
@ -5137,7 +5137,7 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::vector<block_complete
return true; return true;
bool blocks_exist = false; bool blocks_exist = false;
tools::threadpool& tpool = tools::threadpool::getInstance(); tools::threadpool& tpool = tools::threadpool::getInstanceForCompute();
unsigned threads = tpool.get_max_concurrency(); unsigned threads = tpool.get_max_concurrency();
blocks.resize(blocks_entry.size()); blocks.resize(blocks_entry.size());

View File

@ -1013,7 +1013,7 @@ namespace cryptonote
CRITICAL_REGION_LOCAL(m_incoming_tx_lock); CRITICAL_REGION_LOCAL(m_incoming_tx_lock);
tools::threadpool& tpool = tools::threadpool::getInstance(); tools::threadpool& tpool = tools::threadpool::getInstanceForCompute();
tools::threadpool::waiter waiter(tpool); tools::threadpool::waiter waiter(tpool);
epee::span<tx_blob_entry>::const_iterator it = tx_blobs.begin(); epee::span<tx_blob_entry>::const_iterator it = tx_blobs.begin();
for (size_t i = 0; i < tx_blobs.size(); i++, ++it) { for (size_t i = 0; i < tx_blobs.size(); i++, ++it) {

View File

@ -1333,7 +1333,7 @@ namespace rct {
try try
{ {
if (semantics) { if (semantics) {
tools::threadpool& tpool = tools::threadpool::getInstance(); tools::threadpool& tpool = tools::threadpool::getInstanceForCompute();
tools::threadpool::waiter waiter(tpool); tools::threadpool::waiter waiter(tpool);
std::deque<bool> results(rv.outPk.size(), false); std::deque<bool> results(rv.outPk.size(), false);
DP("range proofs verified?"); DP("range proofs verified?");
@ -1383,7 +1383,7 @@ namespace rct {
{ {
PERF_TIMER(verRctSemanticsSimple); PERF_TIMER(verRctSemanticsSimple);
tools::threadpool& tpool = tools::threadpool::getInstance(); tools::threadpool& tpool = tools::threadpool::getInstanceForCompute();
tools::threadpool::waiter waiter(tpool); tools::threadpool::waiter waiter(tpool);
std::deque<bool> results; std::deque<bool> results;
std::vector<const Bulletproof*> bp_proofs; std::vector<const Bulletproof*> bp_proofs;
@ -1536,7 +1536,7 @@ namespace rct {
const size_t threads = std::max(rv.outPk.size(), rv.mixRing.size()); const size_t threads = std::max(rv.outPk.size(), rv.mixRing.size());
std::deque<bool> results(threads); std::deque<bool> results(threads);
tools::threadpool& tpool = tools::threadpool::getInstance(); tools::threadpool& tpool = tools::threadpool::getInstanceForCompute();
tools::threadpool::waiter waiter(tpool); tools::threadpool::waiter waiter(tpool);
const keyV &pseudoOuts = bulletproof || bulletproof_plus ? rv.p.pseudoOuts : rv.pseudoOuts; const keyV &pseudoOuts = bulletproof || bulletproof_plus ? rv.p.pseudoOuts : rv.pseudoOuts;

View File

@ -2724,7 +2724,7 @@ void wallet2::process_parsed_blocks(uint64_t start_height, const std::vector<cry
THROW_WALLET_EXCEPTION_IF(blocks.size() != parsed_blocks.size(), error::wallet_internal_error, "size mismatch"); THROW_WALLET_EXCEPTION_IF(blocks.size() != parsed_blocks.size(), error::wallet_internal_error, "size mismatch");
THROW_WALLET_EXCEPTION_IF(!m_blockchain.is_in_bounds(current_index), error::out_of_hashchain_bounds_error); THROW_WALLET_EXCEPTION_IF(!m_blockchain.is_in_bounds(current_index), error::out_of_hashchain_bounds_error);
tools::threadpool& tpool = tools::threadpool::getInstance(); tools::threadpool& tpool = tools::threadpool::getInstanceForCompute();
tools::threadpool::waiter waiter(tpool); tools::threadpool::waiter waiter(tpool);
size_t num_txes = 0; size_t num_txes = 0;
@ -2967,7 +2967,7 @@ void wallet2::pull_and_parse_next_blocks(uint64_t start_height, uint64_t &blocks
pull_blocks(start_height, blocks_start_height, short_chain_history, blocks, o_indices, current_height); pull_blocks(start_height, blocks_start_height, short_chain_history, blocks, o_indices, current_height);
THROW_WALLET_EXCEPTION_IF(blocks.size() != o_indices.size(), error::wallet_internal_error, "Mismatched sizes of blocks and o_indices"); THROW_WALLET_EXCEPTION_IF(blocks.size() != o_indices.size(), error::wallet_internal_error, "Mismatched sizes of blocks and o_indices");
tools::threadpool& tpool = tools::threadpool::getInstance(); tools::threadpool& tpool = tools::threadpool::getInstanceForCompute();
tools::threadpool::waiter waiter(tpool); tools::threadpool::waiter waiter(tpool);
parsed_blocks.resize(blocks.size()); parsed_blocks.resize(blocks.size());
for (size_t i = 0; i < blocks.size(); ++i) for (size_t i = 0; i < blocks.size(); ++i)
@ -3464,7 +3464,7 @@ void wallet2::refresh(bool trusted_daemon, uint64_t start_height, uint64_t & blo
size_t try_count = 0; size_t try_count = 0;
crypto::hash last_tx_hash_id = m_transfers.size() ? m_transfers.back().m_txid : null_hash; crypto::hash last_tx_hash_id = m_transfers.size() ? m_transfers.back().m_txid : null_hash;
std::list<crypto::hash> short_chain_history; std::list<crypto::hash> short_chain_history;
tools::threadpool& tpool = tools::threadpool::getInstance(); tools::threadpool& tpool = tools::threadpool::getInstanceForCompute();
tools::threadpool::waiter waiter(tpool); tools::threadpool::waiter waiter(tpool);
uint64_t blocks_start_height; uint64_t blocks_start_height;
std::vector<cryptonote::block_complete_entry> blocks; std::vector<cryptonote::block_complete_entry> blocks;

View File

@ -144,7 +144,7 @@ bool wallet2::search_for_rpc_payment(uint64_t credits_target, uint32_t n_threads
n_threads = boost::thread::hardware_concurrency(); n_threads = boost::thread::hardware_concurrency();
std::vector<crypto::hash> hash(n_threads); std::vector<crypto::hash> hash(n_threads);
tools::threadpool& tpool = tools::threadpool::getInstance(); tools::threadpool& tpool = tools::threadpool::getInstanceForCompute();
tools::threadpool::waiter waiter(tpool); tools::threadpool::waiter waiter(tpool);
const uint32_t local_nonce = nonce += n_threads; // wrapping's OK const uint32_t local_nonce = nonce += n_threads; // wrapping's OK

View File

@ -778,7 +778,7 @@ inline bool do_replay_events_get_core(std::vector<test_event_entry>& events, cry
t_test_class validator; t_test_class validator;
bool ret = replay_events_through_core<t_test_class>(c, events, validator); bool ret = replay_events_through_core<t_test_class>(c, events, validator);
tools::threadpool::getInstance().recycle(); tools::threadpool::getInstanceForCompute().recycle();
// c.deinit(); // c.deinit();
return ret; return ret;
} }