Merge pull request #504

9ee48e9 wallet2: speed up wallet refresh for large miners (moneromooo-monero)
4905903 wallet2: parallelize pulling blocks and processing them on refresh (moneromooo-monero)
d0eaf1d wallet2: maintain the short chain manually when refreshing (moneromooo-monero)
a4e9506 wallet2: split pull blocks between pulling and processing (moneromooo-monero)
This commit is contained in:
Riccardo Spagni 2015-11-30 00:17:31 +02:00
commit 5c3b29792c
No known key found for this signature in database
GPG key ID: 55432DF31CCD4FCD
2 changed files with 118 additions and 26 deletions

View file

@ -203,31 +203,64 @@ void wallet2::process_new_transaction(const cryptonote::transaction& tx, uint64_
tx_pub_key = pub_key_field.pub_key; tx_pub_key = pub_key_field.pub_key;
bool r = true; bool r = true;
int threads; int threads = std::thread::hardware_concurrency();
if (miner_tx && m_refresh_type == RefreshNoCoinbase) if (miner_tx && m_refresh_type == RefreshNoCoinbase)
{ {
// assume coinbase isn't for us // assume coinbase isn't for us
} }
else if (miner_tx && m_refresh_type == RefreshOptimizeCoinbase) else if (miner_tx && m_refresh_type == RefreshOptimizeCoinbase)
{
for (size_t i = 0; i < tx.vout.size(); ++i)
{ {
uint64_t money_transfered = 0; uint64_t money_transfered = 0;
bool error = false; bool error = false;
check_acc_out(m_account.get_keys(), tx.vout[i], tx_pub_key, i, money_transfered, error); check_acc_out(m_account.get_keys(), tx.vout[0], tx_pub_key, 0, money_transfered, error);
if (error) if (error)
{
r = false;
}
else
{
// this assumes that the miner tx pays a single address
if (money_transfered > 0)
{
outs.push_back(0);
tx_money_got_in_outs = money_transfered;
// process the other outs from that tx
boost::asio::io_service ioservice;
boost::thread_group threadpool;
std::auto_ptr < boost::asio::io_service::work > work(new boost::asio::io_service::work(ioservice));
for (int i = 0; i < threads; i++)
{
threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioservice));
}
const account_keys &keys = m_account.get_keys();
std::vector<uint64_t> money_transfered(tx.vout.size());
std::deque<bool> error(tx.vout.size());
// the first one was already checked
for (size_t i = 1; i < tx.vout.size(); ++i)
{
ioservice.dispatch(boost::bind(&wallet2::check_acc_out, this, std::cref(keys), std::cref(tx.vout[i]), std::cref(tx_pub_key), i,
std::ref(money_transfered[i]), std::ref(error[i])));
}
KILL_IOSERVICE();
for (size_t i = 1; i < tx.vout.size(); ++i)
{
if (error[i])
{ {
r = false; r = false;
break; break;
} }
// this assumes that the miner tx pays a single address if (money_transfered[i])
if (money_transfered == 0) {
break;
outs.push_back(i); outs.push_back(i);
tx_money_got_in_outs += money_transfered; tx_money_got_in_outs += money_transfered[i];
} }
} }
else if (tx.vout.size() > 1 && (threads = std::thread::hardware_concurrency()) > 1) }
}
}
else if (tx.vout.size() > 1 && threads > 1)
{ {
boost::asio::io_service ioservice; boost::asio::io_service ioservice;
boost::thread_group threadpool; boost::thread_group threadpool;
@ -274,7 +307,9 @@ void wallet2::process_new_transaction(const cryptonote::transaction& tx, uint64_
cryptonote::COMMAND_RPC_GET_TX_GLOBAL_OUTPUTS_INDEXES::request req = AUTO_VAL_INIT(req); cryptonote::COMMAND_RPC_GET_TX_GLOBAL_OUTPUTS_INDEXES::request req = AUTO_VAL_INIT(req);
cryptonote::COMMAND_RPC_GET_TX_GLOBAL_OUTPUTS_INDEXES::response res = AUTO_VAL_INIT(res); cryptonote::COMMAND_RPC_GET_TX_GLOBAL_OUTPUTS_INDEXES::response res = AUTO_VAL_INIT(res);
req.txid = get_transaction_hash(tx); req.txid = get_transaction_hash(tx);
m_daemon_rpc_mutex.lock();
bool r = net_utils::invoke_http_bin_remote_command2(m_daemon_address + "/get_o_indexes.bin", req, res, m_http_client, WALLET_RCP_CONNECTION_TIMEOUT); bool r = net_utils::invoke_http_bin_remote_command2(m_daemon_address + "/get_o_indexes.bin", req, res, m_http_client, WALLET_RCP_CONNECTION_TIMEOUT);
m_daemon_rpc_mutex.unlock();
THROW_WALLET_EXCEPTION_IF(!r, error::no_connection_to_daemon, "get_o_indexes.bin"); THROW_WALLET_EXCEPTION_IF(!r, error::no_connection_to_daemon, "get_o_indexes.bin");
THROW_WALLET_EXCEPTION_IF(res.status == CORE_RPC_STATUS_BUSY, error::daemon_busy, "get_o_indexes.bin"); THROW_WALLET_EXCEPTION_IF(res.status == CORE_RPC_STATUS_BUSY, error::daemon_busy, "get_o_indexes.bin");
THROW_WALLET_EXCEPTION_IF(res.status != CORE_RPC_STATUS_OK, error::get_out_indices_error, res.status); THROW_WALLET_EXCEPTION_IF(res.status != CORE_RPC_STATUS_OK, error::get_out_indices_error, res.status);
@ -478,19 +513,28 @@ void wallet2::parse_block_round(const cryptonote::blobdata &blob, cryptonote::bl
bl_id = get_block_hash(bl); bl_id = get_block_hash(bl);
} }
//---------------------------------------------------------------------------------------------------- //----------------------------------------------------------------------------------------------------
void wallet2::pull_blocks(uint64_t start_height, uint64_t& blocks_added) void wallet2::pull_blocks(uint64_t start_height, uint64_t &blocks_start_height, const std::list<crypto::hash> &short_chain_history, std::list<cryptonote::block_complete_entry> &blocks)
{ {
blocks_added = 0;
cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::request req = AUTO_VAL_INIT(req); cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::request req = AUTO_VAL_INIT(req);
cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::response res = AUTO_VAL_INIT(res); cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::response res = AUTO_VAL_INIT(res);
get_short_chain_history(req.block_ids); req.block_ids = short_chain_history;
req.start_height = start_height; req.start_height = start_height;
m_daemon_rpc_mutex.lock();
bool r = net_utils::invoke_http_bin_remote_command2(m_daemon_address + "/getblocks.bin", req, res, m_http_client, WALLET_RCP_CONNECTION_TIMEOUT); bool r = net_utils::invoke_http_bin_remote_command2(m_daemon_address + "/getblocks.bin", req, res, m_http_client, WALLET_RCP_CONNECTION_TIMEOUT);
m_daemon_rpc_mutex.unlock();
THROW_WALLET_EXCEPTION_IF(!r, error::no_connection_to_daemon, "getblocks.bin"); THROW_WALLET_EXCEPTION_IF(!r, error::no_connection_to_daemon, "getblocks.bin");
THROW_WALLET_EXCEPTION_IF(res.status == CORE_RPC_STATUS_BUSY, error::daemon_busy, "getblocks.bin"); THROW_WALLET_EXCEPTION_IF(res.status == CORE_RPC_STATUS_BUSY, error::daemon_busy, "getblocks.bin");
THROW_WALLET_EXCEPTION_IF(res.status != CORE_RPC_STATUS_OK, error::get_blocks_error, res.status); THROW_WALLET_EXCEPTION_IF(res.status != CORE_RPC_STATUS_OK, error::get_blocks_error, res.status);
size_t current_index = res.start_height; blocks_start_height = res.start_height;
blocks = res.blocks;
}
//----------------------------------------------------------------------------------------------------
void wallet2::process_blocks(uint64_t start_height, const std::list<cryptonote::block_complete_entry> &blocks, uint64_t& blocks_added)
{
size_t current_index = start_height;
blocks_added = 0;
int threads = std::thread::hardware_concurrency(); int threads = std::thread::hardware_concurrency();
if (threads > 1) if (threads > 1)
@ -498,7 +542,6 @@ void wallet2::pull_blocks(uint64_t start_height, uint64_t& blocks_added)
std::vector<crypto::hash> round_block_hashes(threads); std::vector<crypto::hash> round_block_hashes(threads);
std::vector<cryptonote::block> round_blocks(threads); std::vector<cryptonote::block> round_blocks(threads);
std::deque<bool> error(threads); std::deque<bool> error(threads);
const std::list<block_complete_entry> &blocks = res.blocks;
size_t blocks_size = blocks.size(); size_t blocks_size = blocks.size();
std::list<block_complete_entry>::const_iterator blocki = blocks.begin(); std::list<block_complete_entry>::const_iterator blocki = blocks.begin();
for (size_t b = 0; b < blocks_size; b += threads) for (size_t b = 0; b < blocks_size; b += threads)
@ -559,10 +602,10 @@ void wallet2::pull_blocks(uint64_t start_height, uint64_t& blocks_added)
} }
else else
{ {
BOOST_FOREACH(auto& bl_entry, res.blocks) BOOST_FOREACH(auto& bl_entry, blocks)
{ {
cryptonote::block bl; cryptonote::block bl;
r = cryptonote::parse_and_validate_block_from_blob(bl_entry.block, bl); bool r = cryptonote::parse_and_validate_block_from_blob(bl_entry.block, bl);
THROW_WALLET_EXCEPTION_IF(!r, error::block_parse_error, bl_entry.block); THROW_WALLET_EXCEPTION_IF(!r, error::block_parse_error, bl_entry.block);
crypto::hash bl_id = get_block_hash(bl); crypto::hash bl_id = get_block_hash(bl);
@ -574,9 +617,9 @@ void wallet2::pull_blocks(uint64_t start_height, uint64_t& blocks_added)
else if(bl_id != m_blockchain[current_index]) else if(bl_id != m_blockchain[current_index])
{ {
//split detected here !!! //split detected here !!!
THROW_WALLET_EXCEPTION_IF(current_index == res.start_height, error::wallet_internal_error, THROW_WALLET_EXCEPTION_IF(current_index == start_height, error::wallet_internal_error,
"wrong daemon response: split starts from the first block in response " + string_tools::pod_to_hex(bl_id) + "wrong daemon response: split starts from the first block in response " + string_tools::pod_to_hex(bl_id) +
" (height " + std::to_string(res.start_height) + "), local block id at this height: " + " (height " + std::to_string(start_height) + "), local block id at this height: " +
string_tools::pod_to_hex(m_blockchain[current_index])); string_tools::pod_to_hex(m_blockchain[current_index]));
detach_blockchain(current_index); detach_blockchain(current_index);
@ -604,6 +647,23 @@ void wallet2::refresh(uint64_t start_height, uint64_t & blocks_fetched)
refresh(start_height, blocks_fetched, received_money); refresh(start_height, blocks_fetched, received_money);
} }
//---------------------------------------------------------------------------------------------------- //----------------------------------------------------------------------------------------------------
void wallet2::pull_next_blocks(uint64_t start_height, uint64_t &blocks_start_height, std::list<crypto::hash> &short_chain_history, const std::list<cryptonote::block_complete_entry> &prev_blocks, std::list<cryptonote::block_complete_entry> &blocks)
{
// prepend the last 3 blocks, should be enough to guard against a block or two's reorg
cryptonote::block bl;
std::list<cryptonote::block_complete_entry>::const_reverse_iterator i = prev_blocks.rbegin();
for (size_t n = 0; n < std::min((size_t)3, prev_blocks.size()); ++n)
{
bool ok = cryptonote::parse_and_validate_block_from_blob(i->block, bl);
THROW_WALLET_EXCEPTION_IF(!ok, error::block_parse_error, i->block);
short_chain_history.push_front(cryptonote::get_block_hash(bl));
++i;
}
// pull the new blocks
pull_blocks(start_height, blocks_start_height, short_chain_history, blocks);
}
//----------------------------------------------------------------------------------------------------
void wallet2::refresh(uint64_t start_height, uint64_t & blocks_fetched, bool& received_money) void wallet2::refresh(uint64_t start_height, uint64_t & blocks_fetched, bool& received_money)
{ {
received_money = false; received_money = false;
@ -611,15 +671,33 @@ void wallet2::refresh(uint64_t start_height, uint64_t & blocks_fetched, bool& re
uint64_t added_blocks = 0; uint64_t added_blocks = 0;
size_t try_count = 0; size_t try_count = 0;
crypto::hash last_tx_hash_id = m_transfers.size() ? get_transaction_hash(m_transfers.back().m_tx) : null_hash; crypto::hash last_tx_hash_id = m_transfers.size() ? get_transaction_hash(m_transfers.back().m_tx) : null_hash;
std::list<crypto::hash> short_chain_history;
std::thread pull_thread;
uint64_t blocks_start_height;
std::list<cryptonote::block_complete_entry> blocks;
// pull the first set of blocks
get_short_chain_history(short_chain_history);
pull_blocks(start_height, blocks_start_height, short_chain_history, blocks);
while(m_run.load(std::memory_order_relaxed)) while(m_run.load(std::memory_order_relaxed))
{ {
try try
{ {
pull_blocks(start_height, added_blocks); // pull the next set of blocks while we're processing the current one
uint64_t next_blocks_start_height;
std::list<cryptonote::block_complete_entry> next_blocks;
pull_thread = std::thread([&]{pull_next_blocks(start_height, next_blocks_start_height, short_chain_history, blocks, next_blocks);});
process_blocks(blocks_start_height, blocks, added_blocks);
blocks_fetched += added_blocks; blocks_fetched += added_blocks;
pull_thread.join();
if(!added_blocks) if(!added_blocks)
break; break;
// switch to the new blocks from the daemon
blocks_start_height = next_blocks_start_height;
blocks = next_blocks;
} }
catch (const std::exception&) catch (const std::exception&)
{ {
@ -1050,6 +1128,8 @@ bool wallet2::prepare_file_names(const std::string& file_path)
//---------------------------------------------------------------------------------------------------- //----------------------------------------------------------------------------------------------------
bool wallet2::check_connection() bool wallet2::check_connection()
{ {
std::lock_guard<std::mutex> lock(m_daemon_rpc_mutex);
if(m_http_client.is_connected()) if(m_http_client.is_connected())
return true; return true;
@ -1262,7 +1342,9 @@ void wallet2::rescan_spent()
COMMAND_RPC_IS_KEY_IMAGE_SPENT::request req = AUTO_VAL_INIT(req); COMMAND_RPC_IS_KEY_IMAGE_SPENT::request req = AUTO_VAL_INIT(req);
COMMAND_RPC_IS_KEY_IMAGE_SPENT::response daemon_resp = AUTO_VAL_INIT(daemon_resp); COMMAND_RPC_IS_KEY_IMAGE_SPENT::response daemon_resp = AUTO_VAL_INIT(daemon_resp);
req.key_images = key_images; req.key_images = key_images;
m_daemon_rpc_mutex.lock();
bool r = epee::net_utils::invoke_http_json_remote_command2(m_daemon_address + "/is_key_image_spent", req, daemon_resp, m_http_client, 200000); bool r = epee::net_utils::invoke_http_json_remote_command2(m_daemon_address + "/is_key_image_spent", req, daemon_resp, m_http_client, 200000);
m_daemon_rpc_mutex.unlock();
THROW_WALLET_EXCEPTION_IF(!r, error::no_connection_to_daemon, "is_key_image_spent"); THROW_WALLET_EXCEPTION_IF(!r, error::no_connection_to_daemon, "is_key_image_spent");
THROW_WALLET_EXCEPTION_IF(daemon_resp.status == CORE_RPC_STATUS_BUSY, error::daemon_busy, "is_key_image_spent"); THROW_WALLET_EXCEPTION_IF(daemon_resp.status == CORE_RPC_STATUS_BUSY, error::daemon_busy, "is_key_image_spent");
THROW_WALLET_EXCEPTION_IF(daemon_resp.status != CORE_RPC_STATUS_OK, error::is_key_image_spent_error, daemon_resp.status); THROW_WALLET_EXCEPTION_IF(daemon_resp.status != CORE_RPC_STATUS_OK, error::is_key_image_spent_error, daemon_resp.status);
@ -1569,7 +1651,9 @@ void wallet2::commit_tx(pending_tx& ptx)
COMMAND_RPC_SEND_RAW_TX::request req; COMMAND_RPC_SEND_RAW_TX::request req;
req.tx_as_hex = epee::string_tools::buff_to_hex_nodelimer(tx_to_blob(ptx.tx)); req.tx_as_hex = epee::string_tools::buff_to_hex_nodelimer(tx_to_blob(ptx.tx));
COMMAND_RPC_SEND_RAW_TX::response daemon_send_resp; COMMAND_RPC_SEND_RAW_TX::response daemon_send_resp;
m_daemon_rpc_mutex.lock();
bool r = epee::net_utils::invoke_http_json_remote_command2(m_daemon_address + "/sendrawtransaction", req, daemon_send_resp, m_http_client, 200000); bool r = epee::net_utils::invoke_http_json_remote_command2(m_daemon_address + "/sendrawtransaction", req, daemon_send_resp, m_http_client, 200000);
m_daemon_rpc_mutex.unlock();
THROW_WALLET_EXCEPTION_IF(!r, error::no_connection_to_daemon, "sendrawtransaction"); THROW_WALLET_EXCEPTION_IF(!r, error::no_connection_to_daemon, "sendrawtransaction");
THROW_WALLET_EXCEPTION_IF(daemon_send_resp.status == CORE_RPC_STATUS_BUSY, error::daemon_busy, "sendrawtransaction"); THROW_WALLET_EXCEPTION_IF(daemon_send_resp.status == CORE_RPC_STATUS_BUSY, error::daemon_busy, "sendrawtransaction");
THROW_WALLET_EXCEPTION_IF(daemon_send_resp.status != CORE_RPC_STATUS_OK, error::tx_rejected, ptx.tx, daemon_send_resp.status); THROW_WALLET_EXCEPTION_IF(daemon_send_resp.status != CORE_RPC_STATUS_OK, error::tx_rejected, ptx.tx, daemon_send_resp.status);
@ -1757,7 +1841,9 @@ void wallet2::transfer_selected(const std::vector<cryptonote::tx_destination_ent
req.amounts.push_back(it->amount()); req.amounts.push_back(it->amount());
} }
m_daemon_rpc_mutex.lock();
bool r = epee::net_utils::invoke_http_bin_remote_command2(m_daemon_address + "/getrandom_outs.bin", req, daemon_resp, m_http_client, 200000); bool r = epee::net_utils::invoke_http_bin_remote_command2(m_daemon_address + "/getrandom_outs.bin", req, daemon_resp, m_http_client, 200000);
m_daemon_rpc_mutex.unlock();
THROW_WALLET_EXCEPTION_IF(!r, error::no_connection_to_daemon, "getrandom_outs.bin"); THROW_WALLET_EXCEPTION_IF(!r, error::no_connection_to_daemon, "getrandom_outs.bin");
THROW_WALLET_EXCEPTION_IF(daemon_resp.status == CORE_RPC_STATUS_BUSY, error::daemon_busy, "getrandom_outs.bin"); THROW_WALLET_EXCEPTION_IF(daemon_resp.status == CORE_RPC_STATUS_BUSY, error::daemon_busy, "getrandom_outs.bin");
THROW_WALLET_EXCEPTION_IF(daemon_resp.status != CORE_RPC_STATUS_OK, error::get_random_outs_error, daemon_resp.status); THROW_WALLET_EXCEPTION_IF(daemon_resp.status != CORE_RPC_STATUS_OK, error::get_random_outs_error, daemon_resp.status);

View file

@ -355,7 +355,9 @@ namespace tools
bool is_tx_spendtime_unlocked(uint64_t unlock_time) const; bool is_tx_spendtime_unlocked(uint64_t unlock_time) const;
bool is_transfer_unlocked(const transfer_details& td) const; bool is_transfer_unlocked(const transfer_details& td) const;
bool clear(); bool clear();
void pull_blocks(uint64_t start_height, uint64_t& blocks_added); void pull_blocks(uint64_t start_height, uint64_t& blocks_start_height, const std::list<crypto::hash> &short_chain_history, std::list<cryptonote::block_complete_entry> &blocks);
void pull_next_blocks(uint64_t start_height, uint64_t &blocks_start_height, std::list<crypto::hash> &short_chain_history, const std::list<cryptonote::block_complete_entry> &prev_blocks, std::list<cryptonote::block_complete_entry> &blocks);
void process_blocks(uint64_t start_height, const std::list<cryptonote::block_complete_entry> &blocks, uint64_t& blocks_added);
uint64_t select_transfers(uint64_t needed_money, bool add_dust, uint64_t dust, std::list<transfer_container::iterator>& selected_transfers); uint64_t select_transfers(uint64_t needed_money, bool add_dust, uint64_t dust, std::list<transfer_container::iterator>& selected_transfers);
bool prepare_file_names(const std::string& file_path); bool prepare_file_names(const std::string& file_path);
void process_unconfirmed(const cryptonote::transaction& tx, uint64_t height); void process_unconfirmed(const cryptonote::transaction& tx, uint64_t height);
@ -387,6 +389,8 @@ namespace tools
std::atomic<bool> m_run; std::atomic<bool> m_run;
std::mutex m_daemon_rpc_mutex;
i_wallet2_callback* m_callback; i_wallet2_callback* m_callback;
bool m_testnet; bool m_testnet;
bool m_restricted; bool m_restricted;
@ -563,7 +567,9 @@ namespace tools
req.amounts.push_back(it->amount()); req.amounts.push_back(it->amount());
} }
m_daemon_rpc_mutex.lock();
bool r = epee::net_utils::invoke_http_bin_remote_command2(m_daemon_address + "/getrandom_outs.bin", req, daemon_resp, m_http_client, 200000); bool r = epee::net_utils::invoke_http_bin_remote_command2(m_daemon_address + "/getrandom_outs.bin", req, daemon_resp, m_http_client, 200000);
m_daemon_rpc_mutex.unlock();
THROW_WALLET_EXCEPTION_IF(!r, error::no_connection_to_daemon, "getrandom_outs.bin"); THROW_WALLET_EXCEPTION_IF(!r, error::no_connection_to_daemon, "getrandom_outs.bin");
THROW_WALLET_EXCEPTION_IF(daemon_resp.status == CORE_RPC_STATUS_BUSY, error::daemon_busy, "getrandom_outs.bin"); THROW_WALLET_EXCEPTION_IF(daemon_resp.status == CORE_RPC_STATUS_BUSY, error::daemon_busy, "getrandom_outs.bin");
THROW_WALLET_EXCEPTION_IF(daemon_resp.status != CORE_RPC_STATUS_OK, error::get_random_outs_error, daemon_resp.status); THROW_WALLET_EXCEPTION_IF(daemon_resp.status != CORE_RPC_STATUS_OK, error::get_random_outs_error, daemon_resp.status);