blockchain: avoid unnecessary DB lookups when syncing

Some of the inputs for block in a span will be from other earlier
blocks in that span. Keep track of those outputs so we don't have
to look them up again after those early blocks are added to the
blockchain.
This commit is contained in:
moneromooo-monero 2018-11-10 13:46:37 +00:00
parent 58ce16d4d9
commit 756684bb28
No known key found for this signature in database
GPG key ID: 686F07454D6CEFC3
2 changed files with 86 additions and 35 deletions

View file

@ -3723,7 +3723,7 @@ void Blockchain::set_enforce_dns_checkpoints(bool enforce_checkpoints)
}
//------------------------------------------------------------------
void Blockchain::block_longhash_worker(uint64_t height, const std::vector<block> &blocks, std::unordered_map<crypto::hash, crypto::hash> &map) const
void Blockchain::block_longhash_worker(uint64_t height, const epee::span<const block> &blocks, std::unordered_map<crypto::hash, crypto::hash> &map) const
{
TIME_MEASURE_START(t);
slow_hash_allocate_state();
@ -3809,11 +3809,33 @@ bool Blockchain::cleanup_handle_incoming_blocks(bool force_sync)
}
//------------------------------------------------------------------
void Blockchain::output_scan_worker(const uint64_t amount, const std::vector<uint64_t> &offsets, std::vector<output_data_t> &outputs) const
void Blockchain::output_scan_worker(const uint64_t amount, const std::vector<uint64_t> &offsets, std::vector<output_data_t> &outputs, const std::vector<output_data_t> &extra_tx_map) const
{
try
{
m_db->get_output_key(epee::span<const uint64_t>(&amount, 1), offsets, outputs, true);
if (outputs.size() < offsets.size())
{
const uint64_t n_outputs = m_db->get_num_outputs(amount);
for (size_t i = outputs.size(); i < offsets.size(); ++i)
{
uint64_t idx = offsets[i];
if (idx < n_outputs)
{
MWARNING("Index " << idx << " not found in db for amount " << amount << ", but it is less than the number of entries");
break;
}
else if (idx < n_outputs + extra_tx_map.size())
{
outputs.push_back(extra_tx_map[idx - n_outputs]);
}
else
{
MWARNING("missed " << amount << "/" << idx << " in " << extra_tx_map.size() << " (chain " << n_outputs << ")");
break;
}
}
}
}
catch (const std::exception& e)
{
@ -3928,6 +3950,34 @@ uint64_t Blockchain::prevalidate_block_hashes(uint64_t height, const std::vector
// vs [k_image, output_keys] (m_scan_table). This is faster because it takes advantage of bulk queries
// and is threaded if possible. The table (m_scan_table) will be used later when querying output
// keys.
static bool update_output_map(std::map<uint64_t, std::vector<output_data_t>> &extra_tx_map, const transaction &tx, uint64_t height, bool miner)
{
MTRACE("Blockchain::" << __func__);
for (size_t i = 0; i < tx.vout.size(); ++i)
{
const auto &out = tx.vout[i];
if (out.target.type() != typeid(txout_to_key))
continue;
const txout_to_key &out_to_key = boost::get<txout_to_key>(out.target);
rct::key commitment;
uint64_t amount = out.amount;
if (miner && tx.version == 2)
{
commitment = rct::zeroCommit(amount);
amount = 0;
}
else if (tx.version > 1)
{
CHECK_AND_ASSERT_MES(i < tx.rct_signatures.outPk.size(), false, "Invalid outPk size");
commitment = tx.rct_signatures.outPk[i].mask;
}
else
commitment = rct::zero();
extra_tx_map[amount].push_back(output_data_t{out_to_key.key, tx.unlock_time, height, commitment});
}
return true;
}
bool Blockchain::prepare_handle_incoming_blocks(const std::vector<block_complete_entry> &blocks_entry)
{
MTRACE("Blockchain::" << __func__);
@ -3974,42 +4024,40 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::vector<block_complete
m_blockchain_lock.lock();
}
if ((m_db->height() + blocks_entry.size()) < m_blocks_hash_check.size())
const uint64_t height = m_db->height();
if ((height + blocks_entry.size()) < m_blocks_hash_check.size())
return true;
bool blocks_exist = false;
tools::threadpool& tpool = tools::threadpool::getInstance();
uint64_t threads = tpool.get_max_concurrency();
unsigned threads = tpool.get_max_concurrency();
std::vector<block> blocks;
blocks.resize(blocks_entry.size());
if (blocks_entry.size() > 1 && threads > 1 && m_max_prepare_blocks_threads > 1)
if (1)
{
// limit threads, default limit = 4
if(threads > m_max_prepare_blocks_threads)
threads = m_max_prepare_blocks_threads;
uint64_t height = m_db->height();
int batches = blocks_entry.size() / threads;
int extra = blocks_entry.size() % threads;
unsigned int batches = blocks_entry.size() / threads;
unsigned int extra = blocks_entry.size() % threads;
MDEBUG("block_batches: " << batches);
std::vector<std::unordered_map<crypto::hash, crypto::hash>> maps(threads);
std::vector < std::vector < block >> blocks(threads);
auto it = blocks_entry.begin();
unsigned blockidx = 0;
for (uint64_t i = 0; i < threads; i++)
for (unsigned i = 0; i < threads; i++)
{
blocks[i].reserve(batches + 1);
for (int j = 0; j < batches; j++)
for (unsigned int j = 0; j < batches; j++, ++blockidx)
{
block block;
block &block = blocks[blockidx];
if (!parse_and_validate_block_from_blob(it->block, block))
{
std::advance(it, 1);
continue;
}
return false;
// check first block and skip all blocks if its not chained properly
if (i == 0 && j == 0)
if (blockidx == 0)
{
crypto::hash tophash = m_db->top_block_hash();
if (block.prev_id != tophash)
@ -4024,20 +4072,16 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::vector<block_complete
break;
}
blocks[i].push_back(std::move(block));
std::advance(it, 1);
}
}
for (int i = 0; i < extra && !blocks_exist; i++)
for (unsigned i = 0; i < extra && !blocks_exist; i++, blockidx++)
{
block block;
block &block = blocks[blockidx];
if (!parse_and_validate_block_from_blob(it->block, block))
{
std::advance(it, 1);
continue;
}
return false;
if (have_block(get_block_hash(block)))
{
@ -4045,7 +4089,6 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::vector<block_complete
break;
}
blocks[i].push_back(std::move(block));
std::advance(it, 1);
}
@ -4054,10 +4097,13 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::vector<block_complete
m_blocks_longhash_table.clear();
uint64_t thread_height = height;
tools::threadpool::waiter waiter;
for (uint64_t i = 0; i < threads; i++)
for (unsigned int i = 0; i < threads; i++)
{
tpool.submit(&waiter, boost::bind(&Blockchain::block_longhash_worker, this, thread_height, std::cref(blocks[i]), std::ref(maps[i])), true);
thread_height += blocks[i].size();
unsigned nblocks = batches;
if (i < extra)
++nblocks;
tpool.submit(&waiter, boost::bind(&Blockchain::block_longhash_worker, this, thread_height, epee::span<const block>(&blocks[i], nblocks), std::ref(maps[i])), true);
thread_height += nblocks;
}
waiter.wait(&tpool);
@ -4100,7 +4146,7 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::vector<block_complete
// [input] stores all absolute_offsets for each amount
std::map<uint64_t, std::vector<uint64_t>> offset_map;
// [output] stores all output_data_t for each absolute_offset
std::map<uint64_t, std::vector<output_data_t>> tx_map;
std::map<uint64_t, std::vector<output_data_t>> tx_map, extra_tx_map;
std::vector<std::pair<cryptonote::transaction, crypto::hash>> txes(total_txs);
#define SCAN_TABLE_QUIT(m) \
@ -4111,12 +4157,14 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::vector<block_complete
} while(0); \
// generate sorted tables for all amounts and absolute offsets
size_t tx_index = 0;
size_t tx_index = 0, block_index = 0;
for (const auto &entry : blocks_entry)
{
if (m_cancel)
return false;
if (!update_output_map(extra_tx_map, blocks[block_index].miner_tx, height + block_index, true))
SCAN_TABLE_QUIT("Error building extra tx map.");
for (const auto &tx_blob : entry.txs)
{
if (tx_index >= txes.size())
@ -4175,7 +4223,10 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::vector<block_complete
offset_map[in_to_key.amount].push_back(offset);
}
if (!update_output_map(extra_tx_map, tx, height + block_index, false))
SCAN_TABLE_QUIT("Error building extra tx map.");
}
++block_index;
}
// sort and remove duplicate absolute_offsets in offset_map
@ -4198,7 +4249,7 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::vector<block_complete
for (size_t i = 0; i < amounts.size(); i++)
{
uint64_t amount = amounts[i];
tpool.submit(&waiter, boost::bind(&Blockchain::output_scan_worker, this, amount, std::cref(offset_map[amount]), std::ref(tx_map[amount])), true);
tpool.submit(&waiter, boost::bind(&Blockchain::output_scan_worker, this, amount, std::cref(offset_map[amount]), std::ref(tx_map[amount]), std::cref(extra_tx_map[amount])), true);
}
waiter.wait(&tpool);
}
@ -4207,7 +4258,7 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::vector<block_complete
for (size_t i = 0; i < amounts.size(); i++)
{
uint64_t amount = amounts[i];
output_scan_worker(amount, offset_map[amount], tx_map[amount]);
output_scan_worker(amount, offset_map[amount], tx_map[amount], extra_tx_map[amount]);
}
}

View file

@ -918,7 +918,7 @@ namespace cryptonote
* @param outputs return-by-reference the outputs collected
*/
void output_scan_worker(const uint64_t amount,const std::vector<uint64_t> &offsets,
std::vector<output_data_t> &outputs) const;
std::vector<output_data_t> &outputs, const std::vector<output_data_t> &extra_tx_map) const;
/**
* @brief computes the "short" and "long" hashes for a set of blocks
@ -927,7 +927,7 @@ namespace cryptonote
* @param blocks the blocks to be hashed
* @param map return-by-reference the hashes for each block
*/
void block_longhash_worker(uint64_t height, const std::vector<block> &blocks,
void block_longhash_worker(uint64_t height, const epee::span<const block> &blocks,
std::unordered_map<crypto::hash, crypto::hash> &map) const;
/**