Hack for read/write txn mixup

save the thread ID of the writer thread so we don't try to use
the writetxn from reader threads
This commit is contained in:
Howard Chu 2016-03-14 18:26:15 +00:00
parent b28258a1c6
commit 92dd4ec6d6
2 changed files with 31 additions and 42 deletions

View File

@ -203,10 +203,12 @@ inline void lmdb_db_open(MDB_txn* txn, const char* name, int flags, MDB_dbi& dbi
int result = mdb_cursor_open(m_txn, m_ ## name, (MDB_cursor **)&m_cur_ ## name); \
if (result) \
throw0(DB_ERROR(lmdb_error("Failed to open cursor: ", result).c_str())); \
if (!m_write_txn) \
if (m_cursors != &m_wcursors) \
m_tinfo->m_ti_rflags.m_rf_ ## name = true; \
} else if (!m_write_txn && !m_tinfo->m_ti_rflags.m_rf_ ## name) { \
mdb_cursor_renew(m_txn, m_cur_ ## name); \
} else if (m_cursors != &m_wcursors && !m_tinfo->m_ti_rflags.m_rf_ ## name) { \
int result = mdb_cursor_renew(m_txn, m_cur_ ## name); \
if (result) \
throw0(DB_ERROR(lmdb_error("Failed to renew cursor: ", result).c_str())); \
m_tinfo->m_ti_rflags.m_rf_ ## name = true; \
}
@ -1299,8 +1301,9 @@ void BlockchainLMDB::unlock()
} \
#define TXN_PREFIX_RDONLY() \
bool my_rtxn = block_rtxn_start(); \
MDB_txn *m_txn = m_write_txn ? m_write_txn->m_txn : m_tinfo->m_ti_rtxn
MDB_txn *m_txn; \
mdb_txn_cursors *m_cursors; \
bool my_rtxn = block_rtxn_start(&m_txn, &m_cursors);
#define TXN_POSTFIX_RDONLY() \
if (my_rtxn) block_rtxn_stop()
@ -1342,7 +1345,6 @@ bool BlockchainLMDB::block_exists(const crypto::hash& h) const
check_open();
TXN_PREFIX_RDONLY();
const mdb_txn_cursors *m_cursors = m_write_txn ? &m_wcursors : &m_tinfo->m_ti_rcursors;
RCURSOR(block_heights);
MDB_val_copy<crypto::hash> key(h);
@ -1374,7 +1376,6 @@ uint64_t BlockchainLMDB::get_block_height(const crypto::hash& h) const
check_open();
TXN_PREFIX_RDONLY();
const mdb_txn_cursors *m_cursors = m_write_txn ? &m_wcursors : &m_tinfo->m_ti_rcursors;
RCURSOR(block_heights);
MDB_val_copy<crypto::hash> key(h);
@ -1405,7 +1406,6 @@ block BlockchainLMDB::get_block_from_height(const uint64_t& height) const
check_open();
TXN_PREFIX_RDONLY();
const mdb_txn_cursors *m_cursors = m_write_txn ? &m_wcursors : &m_tinfo->m_ti_rcursors;
RCURSOR(blocks);
MDB_val_copy<uint64_t> key(height);
@ -1436,7 +1436,6 @@ uint64_t BlockchainLMDB::get_block_timestamp(const uint64_t& height) const
check_open();
TXN_PREFIX_RDONLY();
const mdb_txn_cursors *m_cursors = m_write_txn ? &m_wcursors : &m_tinfo->m_ti_rcursors;
RCURSOR(block_timestamps);
MDB_val_copy<uint64_t> key(height);
@ -1474,7 +1473,6 @@ size_t BlockchainLMDB::get_block_size(const uint64_t& height) const
check_open();
TXN_PREFIX_RDONLY();
const mdb_txn_cursors *m_cursors = m_write_txn ? &m_wcursors : &m_tinfo->m_ti_rcursors;
RCURSOR(block_sizes);
MDB_val_copy<uint64_t> key(height);
@ -1498,7 +1496,6 @@ difficulty_type BlockchainLMDB::get_block_cumulative_difficulty(const uint64_t&
check_open();
TXN_PREFIX_RDONLY();
const mdb_txn_cursors *m_cursors = m_write_txn ? &m_wcursors : &m_tinfo->m_ti_rcursors;
RCURSOR(block_diffs);
MDB_val_copy<uint64_t> key(height);
@ -1539,7 +1536,6 @@ uint64_t BlockchainLMDB::get_block_already_generated_coins(const uint64_t& heigh
check_open();
TXN_PREFIX_RDONLY();
const mdb_txn_cursors *m_cursors = m_write_txn ? &m_wcursors : &m_tinfo->m_ti_rcursors;
RCURSOR(block_coins);
MDB_val_copy<uint64_t> key(height);
@ -1563,7 +1559,6 @@ crypto::hash BlockchainLMDB::get_block_hash_from_height(const uint64_t& height)
check_open();
TXN_PREFIX_RDONLY();
const mdb_txn_cursors *m_cursors = m_write_txn ? &m_wcursors : &m_tinfo->m_ti_rcursors;
RCURSOR(block_hashes);
MDB_val_copy<uint64_t> key(height);
@ -1649,7 +1644,6 @@ bool BlockchainLMDB::tx_exists(const crypto::hash& h) const
check_open();
TXN_PREFIX_RDONLY();
const mdb_txn_cursors *m_cursors = m_write_txn ? &m_wcursors : &m_tinfo->m_ti_rcursors;
RCURSOR(txs);
MDB_val_copy<crypto::hash> key(h);
@ -1679,7 +1673,6 @@ uint64_t BlockchainLMDB::get_tx_unlock_time(const crypto::hash& h) const
check_open();
TXN_PREFIX_RDONLY();
const mdb_txn_cursors *m_cursors = m_write_txn ? &m_wcursors : &m_tinfo->m_ti_rcursors;
RCURSOR(tx_unlocks);
MDB_val_copy<crypto::hash> key(h);
@ -1701,7 +1694,6 @@ transaction BlockchainLMDB::get_tx(const crypto::hash& h) const
check_open();
TXN_PREFIX_RDONLY();
const mdb_txn_cursors *m_cursors = m_write_txn ? &m_wcursors : &m_tinfo->m_ti_rcursors;
RCURSOR(txs);
MDB_val_copy<crypto::hash> key(h);
@ -1760,7 +1752,6 @@ uint64_t BlockchainLMDB::get_tx_block_height(const crypto::hash& h) const
check_open();
TXN_PREFIX_RDONLY();
const mdb_txn_cursors *m_cursors = m_write_txn ? &m_wcursors : &m_tinfo->m_ti_rcursors;
RCURSOR(tx_heights);
MDB_val_copy<crypto::hash> key(h);
@ -1784,7 +1775,6 @@ uint64_t BlockchainLMDB::get_num_outputs(const uint64_t& amount) const
check_open();
TXN_PREFIX_RDONLY();
const mdb_txn_cursors *m_cursors = m_write_txn ? &m_wcursors : &m_tinfo->m_ti_rcursors;
RCURSOR(output_amounts);
MDB_val_copy<uint64_t> k(amount);
@ -1812,7 +1802,6 @@ output_data_t BlockchainLMDB::get_output_key(const uint64_t &global_index) const
check_open();
TXN_PREFIX_RDONLY();
const mdb_txn_cursors *m_cursors = m_write_txn ? &m_wcursors : &m_tinfo->m_ti_rcursors;
RCURSOR(output_keys);
MDB_val_copy<uint64_t> k(global_index);
@ -1842,7 +1831,6 @@ tx_out_index BlockchainLMDB::get_output_tx_and_index_from_global(const uint64_t&
check_open();
TXN_PREFIX_RDONLY();
const mdb_txn_cursors *m_cursors = m_write_txn ? &m_wcursors : &m_tinfo->m_ti_rcursors;
RCURSOR(output_txs);
RCURSOR(output_indices);
@ -1888,7 +1876,6 @@ std::vector<uint64_t> BlockchainLMDB::get_tx_output_indices(const crypto::hash&
std::vector<uint64_t> index_vec;
TXN_PREFIX_RDONLY();
const mdb_txn_cursors *m_cursors = m_write_txn ? &m_wcursors : &m_tinfo->m_ti_rcursors;
RCURSOR(tx_outputs);
MDB_val_copy<crypto::hash> k(h);
@ -1930,7 +1917,6 @@ std::vector<uint64_t> BlockchainLMDB::get_tx_amount_output_indices(const crypto:
transaction tx = get_tx(h);
TXN_PREFIX_RDONLY();
const mdb_txn_cursors *m_cursors = m_write_txn ? &m_wcursors : &m_tinfo->m_ti_rcursors;
RCURSOR(output_amounts);
uint64_t i = 0;
@ -1997,7 +1983,6 @@ bool BlockchainLMDB::has_key_image(const crypto::key_image& img) const
check_open();
TXN_PREFIX_RDONLY();
const mdb_txn_cursors *m_cursors = m_write_txn ? &m_wcursors : &m_tinfo->m_ti_rcursors;
RCURSOR(spent_keys);
MDB_val_copy<crypto::key_image> val_key(img);
@ -2017,7 +2002,6 @@ bool BlockchainLMDB::for_all_key_images(std::function<bool(const crypto::key_ima
check_open();
TXN_PREFIX_RDONLY();
const mdb_txn_cursors *m_cursors = m_write_txn ? &m_wcursors : &m_tinfo->m_ti_rcursors;
RCURSOR(spent_keys);
MDB_val k;
@ -2051,7 +2035,6 @@ bool BlockchainLMDB::for_all_blocks(std::function<bool(uint64_t, const crypto::h
check_open();
TXN_PREFIX_RDONLY();
const mdb_txn_cursors *m_cursors = m_write_txn ? &m_wcursors : &m_tinfo->m_ti_rcursors;
RCURSOR(blocks);
MDB_val k;
@ -2093,7 +2076,6 @@ bool BlockchainLMDB::for_all_transactions(std::function<bool(const crypto::hash&
check_open();
TXN_PREFIX_RDONLY();
const mdb_txn_cursors *m_cursors = m_write_txn ? &m_wcursors : &m_tinfo->m_ti_rcursors;
RCURSOR(txs);
MDB_val k;
@ -2132,7 +2114,6 @@ bool BlockchainLMDB::for_all_outputs(std::function<bool(uint64_t amount, const c
check_open();
TXN_PREFIX_RDONLY();
const mdb_txn_cursors *m_cursors = m_write_txn ? &m_wcursors : &m_tinfo->m_ti_rcursors;
RCURSOR(output_amounts);
MDB_val k;
@ -2176,6 +2157,7 @@ void BlockchainLMDB::batch_start(uint64_t batch_num_blocks)
throw0(DB_ERROR("batch transaction attempted, but m_write_txn already in use"));
check_open();
m_writer = boost::this_thread::get_id();
check_and_resize_for_batch(batch_num_blocks);
m_write_batch_txn = new mdb_txn_safe();
@ -2218,6 +2200,7 @@ void BlockchainLMDB::batch_commit()
m_write_txn = nullptr;
delete m_write_batch_txn;
m_write_batch_txn = nullptr;
memset(&m_wcursors, 0, sizeof(m_wcursors));
}
@ -2257,8 +2240,9 @@ void BlockchainLMDB::batch_abort()
m_write_txn = nullptr;
// explicitly call in case mdb_env_close() (BlockchainLMDB::close()) called before BlockchainLMDB destructor called.
m_write_batch_txn->abort();
m_batch_active = false;
delete m_write_batch_txn;
m_write_batch_txn = nullptr;
m_batch_active = false;
memset(&m_wcursors, 0, sizeof(m_wcursors));
LOG_PRINT_L3("batch transaction: aborted");
}
@ -2271,10 +2255,14 @@ void BlockchainLMDB::set_batch_transactions(bool batch_transactions)
}
// return true if we started the txn, false if already started
bool BlockchainLMDB::block_rtxn_start() const
bool BlockchainLMDB::block_rtxn_start(MDB_txn **mtxn, mdb_txn_cursors **mcur) const
{
if (m_write_txn)
return false;
bool ret = false;
if (m_write_txn && m_writer == boost::this_thread::get_id()) {
*mtxn = m_write_txn->m_txn;
*mcur = (mdb_txn_cursors *)&m_wcursors;
return ret;
}
if (!m_tinfo.get())
{
m_tinfo.reset(new mdb_threadinfo);
@ -2282,17 +2270,20 @@ bool BlockchainLMDB::block_rtxn_start() const
memset(&m_tinfo->m_ti_rflags, 0, sizeof(m_tinfo->m_ti_rflags));
if (auto mdb_res = mdb_txn_begin(m_env, NULL, MDB_RDONLY, &m_tinfo->m_ti_rtxn))
throw0(DB_ERROR_TXN_START(lmdb_error("Failed to create a read transaction for the db: ", mdb_res).c_str()));
ret = true;
} else if (!m_tinfo->m_ti_rflags.m_rf_txn)
{
if (auto mdb_res = mdb_txn_renew(m_tinfo->m_ti_rtxn))
throw0(DB_ERROR_TXN_START(lmdb_error("Failed to renew a read transaction for the db: ", mdb_res).c_str()));
} else
{
return false;
ret = true;
}
m_tinfo->m_ti_rflags.m_rf_txn = true;
if (ret)
m_tinfo->m_ti_rflags.m_rf_txn = true;
*mtxn = m_tinfo->m_ti_rtxn;
*mcur = &m_tinfo->m_ti_rcursors;
LOG_PRINT_L3("BlockchainLMDB::" << __func__);
return true;
return ret;
}
void BlockchainLMDB::block_rtxn_stop() const
@ -2307,7 +2298,7 @@ void BlockchainLMDB::block_txn_start(bool readonly)
if (readonly)
{
bool didit = false;
if (m_write_txn)
if (m_write_txn && m_writer == boost::this_thread::get_id())
return;
if (!m_tinfo.get())
{
@ -2343,6 +2334,7 @@ void BlockchainLMDB::block_txn_start(bool readonly)
throw0(DB_ERROR_TXN_START((std::string("Attempted to start new write txn when write txn already exists in ")+__FUNCTION__).c_str()));
if (! m_batch_active)
{
m_writer = boost::this_thread::get_id();
m_write_txn = new mdb_txn_safe();
if (auto mdb_res = mdb_txn_begin(m_env, NULL, 0, *m_write_txn))
{
@ -2471,7 +2463,6 @@ void BlockchainLMDB::get_output_tx_and_index_from_global(const std::vector<uint6
tx_out_indices.clear();
TXN_PREFIX_RDONLY();
const mdb_txn_cursors *m_cursors = m_write_txn ? &m_wcursors : &m_tinfo->m_ti_rcursors;
RCURSOR(output_txs);
RCURSOR(output_indices);
@ -2517,7 +2508,6 @@ void BlockchainLMDB::get_output_global_indices(const uint64_t& amount, const std
}
TXN_PREFIX_RDONLY();
const mdb_txn_cursors *m_cursors = m_write_txn ? &m_wcursors : &m_tinfo->m_ti_rcursors;
RCURSOR(output_amounts);
MDB_val_copy<uint64_t> k(amount);
@ -2637,7 +2627,6 @@ void BlockchainLMDB::get_output_key(const uint64_t &amount, const std::vector<ui
outputs.clear();
TXN_PREFIX_RDONLY();
const mdb_txn_cursors *m_cursors = m_write_txn ? &m_wcursors : &m_tinfo->m_ti_rcursors;
std::vector <uint64_t> global_indices;
get_output_global_indices(amount, offsets, global_indices);
@ -2790,7 +2779,6 @@ uint8_t BlockchainLMDB::get_hard_fork_version(uint64_t height) const
check_open();
TXN_PREFIX_RDONLY();
const mdb_txn_cursors *m_cursors = m_write_txn ? &m_wcursors : &m_tinfo->m_ti_rcursors;
RCURSOR(hf_versions);
MDB_val_copy<uint64_t> val_key(height);

View File

@ -272,7 +272,7 @@ public:
virtual void block_txn_start(bool readonly);
virtual void block_txn_stop();
virtual void block_txn_abort();
virtual bool block_rtxn_start() const;
virtual bool block_rtxn_start(MDB_txn **mtxn, mdb_txn_cursors **mcur) const;
virtual void block_rtxn_stop() const;
virtual void pop_block(block& blk, std::vector<transaction>& txs);
@ -388,6 +388,7 @@ private:
std::string m_folder;
mdb_txn_safe* m_write_txn; // may point to either a short-lived txn or a batch txn
mdb_txn_safe* m_write_batch_txn; // persist batch txn outside of BlockchainLMDB
boost::thread::id m_writer;
bool m_batch_transactions; // support for batch transactions
bool m_batch_active; // whether batch transaction is in progress