From ba8331ce417a6b6e49a54712d1e627452d6e7e4b Mon Sep 17 00:00:00 2001 From: moneromooo-monero Date: Tue, 17 Apr 2018 18:16:19 +0100 Subject: [PATCH] wallet2: parse blocks in the RPC thread, not the processing thread Processing typically is the bottleneck --- src/wallet/wallet2.cpp | 73 ++++++++++++++++++++++++------------------ src/wallet/wallet2.h | 12 +++++-- 2 files changed, 51 insertions(+), 34 deletions(-) diff --git a/src/wallet/wallet2.cpp b/src/wallet/wallet2.cpp index 9fdcde93f..57f75e3ec 100644 --- a/src/wallet/wallet2.cpp +++ b/src/wallet/wallet2.cpp @@ -1720,39 +1720,23 @@ void wallet2::pull_hashes(uint64_t start_height, uint64_t &blocks_start_height, hashes = std::move(res.m_block_ids); } //---------------------------------------------------------------------------------------------------- -void wallet2::process_blocks(uint64_t start_height, const std::vector &blocks, const std::vector &o_indices, uint64_t& blocks_added) +void wallet2::process_parsed_blocks(uint64_t start_height, const std::vector &blocks, const std::vector &parsed_blocks, uint64_t& blocks_added) { size_t current_index = start_height; blocks_added = 0; size_t tx_o_indices_idx = 0; - THROW_WALLET_EXCEPTION_IF(blocks.size() != o_indices.size(), error::wallet_internal_error, "size mismatch"); + THROW_WALLET_EXCEPTION_IF(blocks.size() != parsed_blocks.size(), error::wallet_internal_error, "size mismatch"); THROW_WALLET_EXCEPTION_IF(!m_blockchain.is_in_bounds(current_index), error::wallet_internal_error, "Index out of bounds of hashchain"); - std::vector round_block_hashes(blocks.size()); - std::vector round_blocks(blocks.size()); - std::deque error(blocks.size()); - - tools::threadpool& tpool = tools::threadpool::getInstance(); - tools::threadpool::waiter waiter; for (size_t i = 0; i < blocks.size(); ++i) { - tpool.submit(&waiter, boost::bind(&wallet2::parse_block_round, this, std::cref(blocks[i].block), - std::ref(round_blocks[i]), std::ref(round_block_hashes[i]), std::ref(error[i]))); - } - waiter.wait(); - for (size_t i = 0; i < blocks.size(); ++i) - { - THROW_WALLET_EXCEPTION_IF(error[i], error::block_parse_error, blocks[i].block); - } - for (size_t i = 0; i < blocks.size(); ++i) - { - const crypto::hash &bl_id = round_block_hashes[i]; - cryptonote::block &bl = round_blocks[i]; + const crypto::hash &bl_id = parsed_blocks[i].hash; + const cryptonote::block &bl = parsed_blocks[i].block; if(current_index >= m_blockchain.size()) { - process_new_blockchain_entry(bl, blocks[i], bl_id, current_index, o_indices[i]); + process_new_blockchain_entry(bl, blocks[i], bl_id, current_index, parsed_blocks[i].o_indices); ++blocks_added; } else if(bl_id != m_blockchain[current_index]) @@ -1764,7 +1748,7 @@ void wallet2::process_blocks(uint64_t start_height, const std::vector &short_chain_history, const std::vector &prev_blocks, std::vector &blocks, std::vector &o_indices, bool &error) +void wallet2::pull_and_parse_next_blocks(uint64_t start_height, uint64_t &blocks_start_height, std::list &short_chain_history, const std::vector &prev_blocks, std::vector &blocks, std::vector &parsed_blocks, bool &error) { error = false; @@ -1806,7 +1790,28 @@ void wallet2::pull_next_blocks(uint64_t start_height, uint64_t &blocks_start_hei } // pull the new blocks + std::vector o_indices; pull_blocks(start_height, blocks_start_height, short_chain_history, blocks, o_indices); + THROW_WALLET_EXCEPTION_IF(blocks.size() != o_indices.size(), error::wallet_internal_error, "Mismatched sizes of blocks and o_indices"); + + tools::threadpool& tpool = tools::threadpool::getInstance(); + tools::threadpool::waiter waiter; + parsed_blocks.resize(blocks.size()); + for (size_t i = 0; i < blocks.size(); ++i) + { + tpool.submit(&waiter, boost::bind(&wallet2::parse_block_round, this, std::cref(blocks[i].block), + std::ref(parsed_blocks[i].block), std::ref(parsed_blocks[i].hash), std::ref(parsed_blocks[i].error))); + } + waiter.wait(); + for (size_t i = 0; i < blocks.size(); ++i) + { + if (parsed_blocks[i].error) + { + error = true; + break; + } + parsed_blocks[i].o_indices = std::move(o_indices[i]); + } } catch(...) { @@ -2197,7 +2202,7 @@ void wallet2::refresh(uint64_t start_height, uint64_t & blocks_fetched, bool& re tools::threadpool::waiter waiter; uint64_t blocks_start_height; std::vector blocks; - std::vector o_indices; + std::vector parsed_blocks; bool refreshed = false; // pull the first set of blocks @@ -2218,11 +2223,11 @@ void wallet2::refresh(uint64_t start_height, uint64_t & blocks_fetched, bool& re // If stop() is called during fast refresh we don't need to continue if(!m_run.load(std::memory_order_relaxed)) return; - pull_blocks(start_height, blocks_start_height, short_chain_history, blocks, o_indices); // always reset start_height to 0 to force short_chain_ history to be used on // subsequent pulls in this refresh. start_height = 0; + bool first = true; while(m_run.load(std::memory_order_relaxed)) { try @@ -2230,19 +2235,22 @@ void wallet2::refresh(uint64_t start_height, uint64_t & blocks_fetched, bool& re // pull the next set of blocks while we're processing the current one uint64_t next_blocks_start_height; std::vector next_blocks; - std::vector next_o_indices; + std::vector next_parsed_blocks; bool error = false; - if (blocks.empty()) + if (!first && blocks.empty()) { refreshed = false; break; } - tpool.submit(&waiter, [&]{pull_next_blocks(start_height, next_blocks_start_height, short_chain_history, blocks, next_blocks, next_o_indices, error);}); + tpool.submit(&waiter, [&]{pull_and_parse_next_blocks(start_height, next_blocks_start_height, short_chain_history, blocks, next_blocks, next_parsed_blocks, error);}); - process_blocks(blocks_start_height, blocks, o_indices, added_blocks); - blocks_fetched += added_blocks; + if (!first) + { + process_parsed_blocks(blocks_start_height, blocks, parsed_blocks, added_blocks); + blocks_fetched += added_blocks; + } waiter.wait(); - if(blocks_start_height == next_blocks_start_height) + if(!first && blocks_start_height == next_blocks_start_height) { m_node_rpc_proxy.set_height(m_blockchain.size()); refreshed = true; @@ -2252,7 +2260,8 @@ void wallet2::refresh(uint64_t start_height, uint64_t & blocks_fetched, bool& re // switch to the new blocks from the daemon blocks_start_height = next_blocks_start_height; blocks = std::move(next_blocks); - o_indices = next_o_indices; + parsed_blocks = std::move(next_parsed_blocks); + first = false; // handle error from async fetching thread if (error) diff --git a/src/wallet/wallet2.h b/src/wallet/wallet2.h index 3a1f555d7..8f036a707 100644 --- a/src/wallet/wallet2.h +++ b/src/wallet/wallet2.h @@ -452,6 +452,14 @@ namespace tools typedef std::tuple get_outs_entry; + struct parsed_block + { + crypto::hash hash; + cryptonote::block block; + cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::block_output_indices o_indices; + bool error; + }; + /*! * \brief Generates a wallet or restores one. * \param wallet_ Name of wallet file @@ -1126,8 +1134,8 @@ namespace tools void pull_blocks(uint64_t start_height, uint64_t& blocks_start_height, const std::list &short_chain_history, std::vector &blocks, std::vector &o_indices); void pull_hashes(uint64_t start_height, uint64_t& blocks_start_height, const std::list &short_chain_history, std::vector &hashes); void fast_refresh(uint64_t stop_height, uint64_t &blocks_start_height, std::list &short_chain_history); - void pull_next_blocks(uint64_t start_height, uint64_t &blocks_start_height, std::list &short_chain_history, const std::vector &prev_blocks, std::vector &blocks, std::vector &o_indices, bool &error); - void process_blocks(uint64_t start_height, const std::vector &blocks, const std::vector &o_indices, uint64_t& blocks_added); + void pull_and_parse_next_blocks(uint64_t start_height, uint64_t &blocks_start_height, std::list &short_chain_history, const std::vector &prev_blocks, std::vector &blocks, std::vector &parsed_blocks, bool &error); + void process_parsed_blocks(uint64_t start_height, const std::vector &blocks, const std::vector &parsed_blocks, uint64_t& blocks_added); uint64_t select_transfers(uint64_t needed_money, std::vector unused_transfers_indices, std::vector& selected_transfers, bool trusted_daemon) const; bool prepare_file_names(const std::string& file_path); void process_unconfirmed(const crypto::hash &txid, const cryptonote::transaction& tx, uint64_t height);