BlockchainLMDB: Add support for batch transactions

This commit is contained in:
warptangent 2015-02-11 15:55:53 -08:00
parent 8909d7d82e
commit 58ecc58be1
No known key found for this signature in database
GPG key ID: 0E490BEBFBE4E92D
2 changed files with 289 additions and 55 deletions

View file

@ -600,15 +600,23 @@ void BlockchainLMDB::check_open() const
BlockchainLMDB::~BlockchainLMDB()
{
LOG_PRINT_L3("BlockchainLMDB::" << __func__);
// batch transaction shouldn't be active at this point. If it is, consider it aborted.
if (m_batch_active)
batch_abort();
}
BlockchainLMDB::BlockchainLMDB()
BlockchainLMDB::BlockchainLMDB(bool batch_transactions)
{
LOG_PRINT_L3("BlockchainLMDB::" << __func__);
// initialize folder to something "safe" just in case
// someone accidentally misuses this class...
m_folder = "thishsouldnotexistbecauseitisgibberish";
m_open = false;
m_batch_transactions = batch_transactions;
m_write_txn = nullptr;
m_batch_active = false;
m_height = 0;
}
@ -720,6 +728,13 @@ void BlockchainLMDB::create(const std::string& filename)
void BlockchainLMDB::close()
{
LOG_PRINT_L3("BlockchainLMDB::" << __func__);
if (m_batch_active)
{
LOG_PRINT_L3("close() first calling batch_abort() due to active batch transaction");
batch_abort();
}
this->sync();
// FIXME: not yet thread safe!!! Use with care.
mdb_env_close(m_env);
}
@ -727,7 +742,13 @@ void BlockchainLMDB::close()
void BlockchainLMDB::sync()
{
LOG_PRINT_L3("BlockchainLMDB::" << __func__);
// LMDB documentation leads me to believe this is unnecessary
// Does nothing unless LMDB environment was opened with MDB_NOSYNC or in part
// MDB_NOMETASYNC. Force flush to be synchronous.
if (auto result = mdb_env_sync(m_env, true))
{
throw0(DB_ERROR(std::string("Failed to sync database").append(mdb_strerror(result)).c_str()));
}
}
void BlockchainLMDB::reset()
@ -867,12 +888,17 @@ uint64_t BlockchainLMDB::get_block_timestamp(const uint64_t& height) const
check_open();
txn_safe txn;
txn_safe* txn_ptr = &txn;
if (m_batch_active)
txn_ptr = m_write_txn;
else
{
if (mdb_txn_begin(m_env, NULL, MDB_RDONLY, txn))
throw0(DB_ERROR("Failed to create a transaction for the db"));
}
MDB_val_copy<uint64_t> key(height);
MDB_val result;
auto get_result = mdb_get(txn, m_block_timestamps, &key, &result);
auto get_result = mdb_get(*txn_ptr, m_block_timestamps, &key, &result);
if (get_result == MDB_NOTFOUND)
{
throw0(DB_ERROR(std::string("Attempt to get timestamp from height ").append(boost::lexical_cast<std::string>(height)).append(" failed -- timestamp not in db").c_str()));
@ -880,6 +906,7 @@ uint64_t BlockchainLMDB::get_block_timestamp(const uint64_t& height) const
else if (get_result)
throw0(DB_ERROR("Error attempting to retrieve a timestamp from the db"));
if (! m_batch_active)
txn.commit();
return *(const uint64_t*)result.mv_data;
}
@ -904,12 +931,18 @@ size_t BlockchainLMDB::get_block_size(const uint64_t& height) const
check_open();
txn_safe txn;
txn_safe* txn_ptr = &txn;
if (m_batch_active)
txn_ptr = m_write_txn;
else
{
if (mdb_txn_begin(m_env, NULL, MDB_RDONLY, txn))
throw0(DB_ERROR("Failed to create a transaction for the db"));
}
MDB_val_copy<uint64_t> key(height);
MDB_val result;
auto get_result = mdb_get(txn, m_block_sizes, &key, &result);
auto get_result = mdb_get(*txn_ptr, m_block_sizes, &key, &result);
if (get_result == MDB_NOTFOUND)
{
throw0(DB_ERROR(std::string("Attempt to get block size from height ").append(boost::lexical_cast<std::string>(height)).append(" failed -- block size not in db").c_str()));
@ -917,6 +950,7 @@ size_t BlockchainLMDB::get_block_size(const uint64_t& height) const
else if (get_result)
throw0(DB_ERROR("Error attempting to retrieve a block size from the db"));
if (! m_batch_active)
txn.commit();
return *(const size_t*)result.mv_data;
}
@ -927,12 +961,17 @@ difficulty_type BlockchainLMDB::get_block_cumulative_difficulty(const uint64_t&
check_open();
txn_safe txn;
txn_safe* txn_ptr = &txn;
if (m_batch_active)
txn_ptr = m_write_txn;
else
{
if (mdb_txn_begin(m_env, NULL, MDB_RDONLY, txn))
throw0(DB_ERROR("Failed to create a transaction for the db"));
}
MDB_val_copy<uint64_t> key(height);
MDB_val result;
auto get_result = mdb_get(txn, m_block_diffs, &key, &result);
auto get_result = mdb_get(*txn_ptr, m_block_diffs, &key, &result);
if (get_result == MDB_NOTFOUND)
{
throw0(DB_ERROR(std::string("Attempt to get cumulative difficulty from height ").append(boost::lexical_cast<std::string>(height)).append(" failed -- difficulty not in db").c_str()));
@ -940,6 +979,7 @@ difficulty_type BlockchainLMDB::get_block_cumulative_difficulty(const uint64_t&
else if (get_result)
throw0(DB_ERROR("Error attempting to retrieve a cumulative difficulty from the db"));
if (! m_batch_active)
txn.commit();
return *(difficulty_type*)result.mv_data;
}
@ -967,12 +1007,18 @@ uint64_t BlockchainLMDB::get_block_already_generated_coins(const uint64_t& heigh
check_open();
txn_safe txn;
txn_safe* txn_ptr = &txn;
if (m_batch_active)
txn_ptr = m_write_txn;
else
{
if (mdb_txn_begin(m_env, NULL, MDB_RDONLY, txn))
throw0(DB_ERROR("Failed to create a transaction for the db"));
}
MDB_val_copy<uint64_t> key(height);
MDB_val result;
auto get_result = mdb_get(txn, m_block_coins, &key, &result);
auto get_result = mdb_get(*txn_ptr, m_block_coins, &key, &result);
if (get_result == MDB_NOTFOUND)
{
throw0(DB_ERROR(std::string("Attempt to get generated coins from height ").append(boost::lexical_cast<std::string>(height)).append(" failed -- block size not in db").c_str()));
@ -980,6 +1026,7 @@ uint64_t BlockchainLMDB::get_block_already_generated_coins(const uint64_t& heigh
else if (get_result)
throw0(DB_ERROR("Error attempting to retrieve a total generated coins from the db"));
if (! m_batch_active)
txn.commit();
return *(const uint64_t*)result.mv_data;
}
@ -990,19 +1037,27 @@ crypto::hash BlockchainLMDB::get_block_hash_from_height(const uint64_t& height)
check_open();
txn_safe txn;
txn_safe* txn_ptr = &txn;
if (m_batch_active)
txn_ptr = m_write_txn;
else
{
if (mdb_txn_begin(m_env, NULL, MDB_RDONLY, txn))
throw0(DB_ERROR("Failed to create a transaction for the db"));
}
MDB_val_copy<uint64_t> key(height);
MDB_val result;
auto get_result = mdb_get(txn, m_block_hashes, &key, &result);
auto get_result = mdb_get(*txn_ptr, m_block_hashes, &key, &result);
if (get_result == MDB_NOTFOUND)
{
throw0(BLOCK_DNE(std::string("Attempt to get hash from height ").append(boost::lexical_cast<std::string>(height)).append(" failed -- hash not in db").c_str()));
}
else if (get_result)
throw0(DB_ERROR("Error attempting to retrieve a block hash from the db"));
throw0(DB_ERROR(std::string("Error attempting to retrieve a block hash from the db: ").
append(mdb_strerror(get_result)).c_str()));
if (! m_batch_active)
txn.commit();
return *(crypto::hash*)result.mv_data;
}
@ -1075,6 +1130,10 @@ bool BlockchainLMDB::tx_exists(const crypto::hash& h) const
LOG_PRINT_L3("BlockchainLMDB::" << __func__);
check_open();
if (m_batch_active)
{
LOG_PRINT_L0("WARNING: active batch transaction while creating a read-only txn in tx_exists()");
}
txn_safe txn;
if (mdb_txn_begin(m_env, NULL, MDB_RDONLY, txn))
throw0(DB_ERROR("Failed to create a transaction for the db"));
@ -1120,12 +1179,18 @@ transaction BlockchainLMDB::get_tx(const crypto::hash& h) const
check_open();
txn_safe txn;
txn_safe* txn_ptr = &txn;
if (m_batch_active)
txn_ptr = m_write_txn;
else
{
if (mdb_txn_begin(m_env, NULL, MDB_RDONLY, txn))
throw0(DB_ERROR("Failed to create a transaction for the db"));
}
MDB_val_copy<crypto::hash> key(h);
MDB_val result;
auto get_result = mdb_get(txn, m_txs, &key, &result);
auto get_result = mdb_get(*txn_ptr, m_txs, &key, &result);
if (get_result == MDB_NOTFOUND)
throw1(TX_DNE(std::string("tx with hash ").append(epee::string_tools::pod_to_hex(h)).append(" not found in db").c_str()));
else if (get_result)
@ -1137,6 +1202,8 @@ transaction BlockchainLMDB::get_tx(const crypto::hash& h) const
transaction tx;
if (!parse_and_validate_tx_from_blob(bd, tx))
throw0(DB_ERROR("Failed to parse tx from blob retrieved from the db"));
if (! m_batch_active)
txn.commit();
return tx;
}
@ -1178,13 +1245,27 @@ uint64_t BlockchainLMDB::get_tx_block_height(const crypto::hash& h) const
LOG_PRINT_L3("BlockchainLMDB::" << __func__);
check_open();
// If m_batch_active is set, a batch transaction exists beyond this class,
// such as a batch import with verification enabled, or possibly (later) a
// batch network sync.
//
// A regular network sync without batching would be expected to open a new
// read transaction here, as validation is done prior to the write for block
// and tx data.
txn_safe txn;
txn_safe* txn_ptr = &txn;
if (m_batch_active)
txn_ptr = m_write_txn;
else
{
if (mdb_txn_begin(m_env, NULL, MDB_RDONLY, txn))
throw0(DB_ERROR("Failed to create a transaction for the db"));
}
MDB_val_copy<crypto::hash> key(h);
MDB_val result;
auto get_result = mdb_get(txn, m_tx_heights, &key, &result);
auto get_result = mdb_get(*txn_ptr, m_tx_heights, &key, &result);
if (get_result == MDB_NOTFOUND)
{
throw1(TX_DNE(std::string("tx height with hash ").append(epee::string_tools::pod_to_hex(h)).append(" not found in db").c_str()));
@ -1192,6 +1273,9 @@ uint64_t BlockchainLMDB::get_tx_block_height(const crypto::hash& h) const
else if (get_result)
throw0(DB_ERROR("DB error attempting to fetch tx height from hash"));
if (! m_batch_active)
txn.commit();
return *(const uint64_t*)result.mv_data;
}
@ -1334,13 +1418,18 @@ tx_out_index BlockchainLMDB::get_output_tx_and_index_from_global(const uint64_t&
check_open();
txn_safe txn;
txn_safe* txn_ptr = &txn;
if (m_batch_active)
txn_ptr = m_write_txn;
else
{
if (mdb_txn_begin(m_env, NULL, MDB_RDONLY, txn))
throw0(DB_ERROR("Failed to create a transaction for the db"));
}
MDB_val_copy<uint64_t> k(index);
MDB_val v;
auto get_result = mdb_get(txn, m_output_txs, &k, &v);
auto get_result = mdb_get(*txn_ptr, m_output_txs, &k, &v);
if (get_result == MDB_NOTFOUND)
throw1(OUTPUT_DNE("output with given index not in db"));
else if (get_result)
@ -1348,11 +1437,13 @@ tx_out_index BlockchainLMDB::get_output_tx_and_index_from_global(const uint64_t&
crypto::hash tx_hash = *(crypto::hash*)v.mv_data;
get_result = mdb_get(txn, m_output_indices, &k, &v);
get_result = mdb_get(*txn_ptr, m_output_indices, &k, &v);
if (get_result == MDB_NOTFOUND)
throw1(OUTPUT_DNE("output with given index not in db"));
else if (get_result)
throw0(DB_ERROR("DB error attempting to fetch output tx index"));
if (! m_batch_active)
txn.commit();
return tx_out_index(tx_hash, *(const uint64_t *)v.mv_data);
}
@ -1363,10 +1454,15 @@ tx_out_index BlockchainLMDB::get_output_tx_and_index(const uint64_t& amount, con
check_open();
txn_safe txn;
txn_safe* txn_ptr = &txn;
if (m_batch_active)
txn_ptr = m_write_txn;
else
{
if (mdb_txn_begin(m_env, NULL, MDB_RDONLY, txn))
throw0(DB_ERROR("Failed to create a transaction for the db"));
lmdb_cur cur(txn, m_output_amounts);
}
lmdb_cur cur(*txn_ptr, m_output_amounts);
MDB_val_copy<uint64_t> k(amount);
MDB_val v;
@ -1395,6 +1491,7 @@ tx_out_index BlockchainLMDB::get_output_tx_and_index(const uint64_t& amount, con
cur.close();
if (! m_batch_active)
txn.commit();
return get_output_tx_and_index_from_global(glob_index);
@ -1538,6 +1635,85 @@ bool BlockchainLMDB::has_key_image(const crypto::key_image& img) const
return false;
}
void BlockchainLMDB::batch_start()
{
LOG_PRINT_L3("BlockchainLMDB::" << __func__);
if (! m_batch_transactions)
throw0(DB_ERROR("batch transactions not enabled"));
if (m_batch_active)
throw0(DB_ERROR("batch transaction already in progress"));
if (m_write_txn)
throw0(DB_ERROR("batch transaction attempted, but m_write_txn already in use"));
check_open();
// NOTE: need to make sure it's destroyed properly when done
if (mdb_txn_begin(m_env, NULL, 0, m_write_batch_txn))
throw0(DB_ERROR("Failed to create a transaction for the db"));
// indicates this transaction is for batch transactions, but not whether it's
// active
m_write_batch_txn.m_batch_txn = true;
m_write_txn = &m_write_batch_txn;
m_batch_active = true;
LOG_PRINT_L3("batch transaction: begin");
}
void BlockchainLMDB::batch_commit()
{
LOG_PRINT_L3("BlockchainLMDB::" << __func__);
if (! m_batch_transactions)
throw0(DB_ERROR("batch transactions not enabled"));
if (! m_batch_active)
throw0(DB_ERROR("batch transaction not in progress"));
check_open();
LOG_PRINT_L3("batch transaction: committing...");
m_write_txn->commit();
LOG_PRINT_L3("batch transaction: committed");
if (mdb_txn_begin(m_env, NULL, 0, m_write_batch_txn))
throw0(DB_ERROR("Failed to create a transaction for the db"));
if (! m_write_batch_txn.m_batch_txn)
throw0(DB_ERROR("m_write_batch_txn not marked as a batch transaction"));
m_write_txn = &m_write_batch_txn;
}
void BlockchainLMDB::batch_stop()
{
LOG_PRINT_L3("BlockchainLMDB::" << __func__);
if (! m_batch_transactions)
throw0(DB_ERROR("batch transactions not enabled"));
if (! m_batch_active)
throw0(DB_ERROR("batch transaction not in progress"));
check_open();
LOG_PRINT_L3("batch transaction: committing...");
m_write_txn->commit();
// for destruction of batch transaction
m_write_txn = nullptr;
m_batch_active = false;
LOG_PRINT_L3("batch transaction: end");
}
void BlockchainLMDB::batch_abort()
{
LOG_PRINT_L3("BlockchainLMDB::" << __func__);
if (! m_batch_transactions)
throw0(DB_ERROR("batch transactions not enabled"));
if (! m_batch_active)
throw0(DB_ERROR("batch transaction not in progress"));
check_open();
// for destruction of batch transaction
m_write_txn = nullptr;
// explicitly call in case mdb_env_close() (BlockchainLMDB::close()) called before BlockchainLMDB destructor called.
m_write_batch_txn.abort();
m_batch_active = false;
LOG_PRINT_L3("batch transaction: aborted");
}
void BlockchainLMDB::set_batch_transactions(bool batch_transactions)
{
LOG_PRINT_L3("BlockchainLMDB::" << __func__);
m_batch_transactions = batch_transactions;
LOG_PRINT_L3("batch transactions " << (m_batch_transactions ? "enabled" : "disabled"));
}
uint64_t BlockchainLMDB::add_block( const block& blk
, const size_t& block_size
, const difficulty_type& cumulative_difficulty
@ -1547,22 +1723,30 @@ uint64_t BlockchainLMDB::add_block( const block& blk
{
LOG_PRINT_L3("BlockchainLMDB::" << __func__);
check_open();
txn_safe txn;
if (! m_batch_active)
{
if (mdb_txn_begin(m_env, NULL, 0, txn))
throw0(DB_ERROR("Failed to create a transaction for the db"));
m_write_txn = &txn;
}
uint64_t num_outputs = m_num_outputs;
try
{
BlockchainDB::add_block(blk, block_size, cumulative_difficulty, coins_generated, txs);
if (! m_batch_active)
{
m_write_txn = NULL;
txn.commit();
}
}
catch (...)
{
m_num_outputs = num_outputs;
if (! m_batch_active)
m_write_txn = NULL;
throw;
}
@ -1574,18 +1758,24 @@ void BlockchainLMDB::pop_block(block& blk, std::vector<transaction>& txs)
{
LOG_PRINT_L3("BlockchainLMDB::" << __func__);
txn_safe txn;
if (! m_batch_active)
{
if (mdb_txn_begin(m_env, NULL, 0, txn))
throw0(DB_ERROR("Failed to create a transaction for the db"));
m_write_txn = &txn;
}
uint64_t num_outputs = m_num_outputs;
try
{
BlockchainDB::pop_block(blk, txs);
if (! m_batch_active)
{
m_write_txn = NULL;
txn.commit();
}
}
catch (...)
{
m_num_outputs = num_outputs;

View file

@ -38,8 +38,23 @@ struct txn_safe
txn_safe() : m_txn(NULL) { }
~txn_safe()
{
if(m_txn != NULL)
LOG_PRINT_L3("txn_safe: destructor");
if (m_txn != NULL)
{
if (m_batch_txn) // this is a batch txn and should have been handled before this point for safety
{
LOG_PRINT_L0("WARNING: txn_safe: m_txn is a batch txn and it's not NULL in destructor - calling mdb_txn_abort()");
}
else
{
// Example of when this occurs: a lookup fails, so a read-only txn is
// aborted through this destructor. However, successful read-only txns
// ideally should have been committed when done and not end up here.
//
// NOTE: not sure if this is ever reached for a non-batch write
// transaction, but it's probably not ideal if it did.
LOG_PRINT_L3("txn_safe: m_txn not NULL in destructor - calling mdb_txn_abort()");
}
mdb_txn_abort(m_txn);
}
}
@ -60,6 +75,24 @@ struct txn_safe
m_txn = NULL;
}
// This should only be needed for batch transaction which must be ensured to
// be aborted before mdb_env_close, not after. So we can't rely on
// BlockchainLMDB destructor to call txn_safe destructor, as that's too late
// to properly abort, since mdb_env_close would have been called earlier.
void abort()
{
LOG_PRINT_L3("txn_safe: abort()");
if(m_txn != NULL)
{
mdb_txn_abort(m_txn);
m_txn = NULL;
}
else
{
LOG_PRINT_L0("WARNING: txn_safe: abort() called, but m_txn is NULL");
}
}
operator MDB_txn*()
{
return m_txn;
@ -71,13 +104,14 @@ struct txn_safe
}
MDB_txn* m_txn;
bool m_batch_txn = false;
};
class BlockchainLMDB : public BlockchainDB
{
public:
BlockchainLMDB();
BlockchainLMDB(bool batch_transactions=false);
~BlockchainLMDB();
virtual void open(const std::string& filename);
@ -177,6 +211,12 @@ public:
, const std::vector<transaction>& txs
);
virtual void set_batch_transactions(bool batch_transactions);
virtual void batch_start();
virtual void batch_commit();
virtual void batch_stop();
virtual void batch_abort();
virtual void pop_block(block& blk, std::vector<transaction>& txs);
private:
@ -264,7 +304,11 @@ private:
uint64_t m_height;
uint64_t m_num_outputs;
std::string m_folder;
txn_safe* m_write_txn;
txn_safe* m_write_txn; // may point to either a short-lived txn or a batch txn
txn_safe m_write_batch_txn; // persist batch txn outside of BlockchainLMDB
bool m_batch_transactions; // support for batch transactions
bool m_batch_active; // whether batch transaction is in progress
};
} // namespace cryptonote