From 37f605efdc18fd8d7d73b5ec92604fb342496268 Mon Sep 17 00:00:00 2001 From: Jethro Grassie Date: Sun, 18 Aug 2019 13:49:02 -0400 Subject: [PATCH] database: add resizing --- src/pool.c | 122 ++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 98 insertions(+), 24 deletions(-) diff --git a/src/pool.c b/src/pool.c index f1ea7a2..9fae143 100644 --- a/src/pool.c +++ b/src/pool.c @@ -71,7 +71,7 @@ developers. #include "forkoff.h" #define MAX_LINE 8192 -#define POOL_CLIENTS_GROW 1024 +#define POOL_CLIENTS_GROW 0x4000 #define RPC_BODY_MAX 8192 #define JOB_BODY_MAX 8192 #define ERROR_BODY_MAX 512 @@ -82,7 +82,8 @@ developers. #define MAINNET_ADDRESS_PREFIX 18 #define TESTNET_ADDRESS_PREFIX 53 #define BLOCK_HEADERS_RANGE 10 -#define DB_SIZE 0x140000000 /* 5G */ +#define DB_INIT_SIZE 0x140000000 /* 5G */ +#define DB_GROW_SIZE 0xA0000000 /* 2.5G */ #define DB_COUNT_MAX 10 #define MAX_PATH 1024 #define RPC_PATH "/json_rpc" @@ -272,6 +273,7 @@ static BIGNUM *base_diff; static pool_stats_t pool_stats; static pthread_mutex_t mutex_clients = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t mutex_log = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t mutex_tx = PTHREAD_MUTEX_INITIALIZER; static FILE *fd_log; static unsigned char sec_view[32]; static unsigned char pub_spend[32]; @@ -372,6 +374,62 @@ compare_payment(const MDB_val *a, const MDB_val *b) return (va->timestamp < vb->timestamp) ? -1 : 1; } +static int +database_resize(void) +{ + const double threshold = 0.9; + MDB_envinfo ei; + MDB_stat st; + int rc; + char *err; + + mdb_env_info(env, &ei); + mdb_env_stat(env, &st); + + if(ei.me_mapsize < DB_INIT_SIZE) + { + if ((rc = pthread_mutex_trylock(&mutex_tx))) + { + log_warn("Cannot cannot acquire lock"); + return rc; + } + if ((rc = mdb_env_set_mapsize(env, DB_INIT_SIZE)) != 0) + { + err = mdb_strerror(rc); + log_error("%s", err); + goto unlock; + } + log_debug("Database initial size: %"PRIu64, DB_INIT_SIZE); + goto unlock; + } + + uint64_t used = st.ms_psize * ei.me_last_pgno; + uint64_t remaining = (uint64_t) ei.me_mapsize - used; + log_debug("Database (used/free): %"PRIu64"/%"PRIu64, used, remaining); + + if ((double)used / ei.me_mapsize > threshold) + { + uint64_t ns = (uint64_t) ei.me_mapsize + DB_GROW_SIZE; + if ((rc = pthread_mutex_trylock(&mutex_tx))) + { + log_warn("Cannot cannot acquire lock"); + return rc; + } + if ((rc = mdb_env_set_mapsize(env, ns)) != 0) + { + err = mdb_strerror(rc); + log_error("%s", err); + goto unlock; + } + log_info("Database resized to: %"PRIu64, ns); + goto unlock; + } + return 0; +unlock: + pthread_mutex_unlock(&mutex_tx); + return rc; +} + static int database_init(const char* data_dir) { @@ -381,13 +439,17 @@ database_init(const char* data_dir) rc = mdb_env_create(&env); mdb_env_set_maxdbs(env, (MDB_dbi) DB_COUNT_MAX); - mdb_env_set_mapsize(env, DB_SIZE); if ((rc = mdb_env_open(env, data_dir, 0, 0664)) != 0) { err = mdb_strerror(rc); log_fatal("%s (%s)", err, data_dir); exit(rc); } + if ((rc = database_resize())) + { + log_fatal("Cannot resize DB"); + exit(rc); + } if ((rc = mdb_txn_begin(env, NULL, 0, &txn)) != 0) { err = mdb_strerror(rc); @@ -536,13 +598,12 @@ store_block(uint64_t height, block_t *block) uint64_t miner_hr(const char *address) { - pthread_mutex_lock(&mutex_clients); client_t *c = pool_clients.clients; uint64_t hr = 0; for (size_t i = 0; i < pool_clients.count; i++, c++) { - if (c->connected_since != 0 - && strncmp(c->address, address, ADDRESS_MAX) == 0) + if (c->connected_since + && !strncmp(c->address, address, ADDRESS_MAX)) { double d = difftime(time(NULL), c->connected_since); if (d == 0.0) @@ -551,36 +612,39 @@ miner_hr(const char *address) continue; } } - pthread_mutex_unlock(&mutex_clients); return hr; } uint64_t miner_balance(const char *address) { - if (strlen(address) > ADDRESS_MAX) - return 0; int rc; char *err; - MDB_txn *txn; - MDB_cursor *cursor; + MDB_txn *txn = NULL; + MDB_cursor *cursor = NULL; + uint64_t balance = 0; + + if (strlen(address) > ADDRESS_MAX) + return balance; + + pthread_mutex_trylock(&mutex_tx); + if ((rc = mdb_txn_begin(env, NULL, MDB_RDONLY, &txn)) != 0) { err = mdb_strerror(rc); log_error("%s", err); - return 0; + goto cleanup; } if ((rc = mdb_cursor_open(txn, db_balance, &cursor)) != 0) { err = mdb_strerror(rc); log_error("%s", err); mdb_txn_abort(txn); - return 0; + goto cleanup; } MDB_val key = {ADDRESS_MAX, (void*)address}; MDB_val val; - uint64_t balance = 0; rc = mdb_cursor_get(cursor, &key, &val, MDB_SET); if (rc != 0 && rc != MDB_NOTFOUND) @@ -595,8 +659,11 @@ miner_balance(const char *address) balance = *(uint64_t*)val.mv_data; cleanup: - mdb_cursor_close(cursor); - mdb_txn_abort(txn); + pthread_mutex_unlock(&mutex_tx); + if (cursor) + mdb_cursor_close(cursor); + if (txn) + mdb_txn_abort(txn); return balance; } @@ -849,7 +916,7 @@ update_pool_hr(void) client_t *c = pool_clients.clients; for (size_t i = 0; i < pool_clients.count; i++, c++) { - if (c->connected_since != 0) + if (c->connected_since) { double d = difftime(time(NULL), c->connected_since); if (d == 0.0) @@ -857,6 +924,7 @@ update_pool_hr(void) hr += c->hashes / d; } } + log_debug("Pool hashrate: %"PRIu64, hr); pool_stats.pool_hashrate = hr; } @@ -2583,6 +2651,8 @@ timer_on_120s(int fd, short kind, void *ctx) static void timer_on_10m(int fd, short kind, void *ctx) { + if (database_resize()) + log_warn("DB resize needed, will retry later"); send_payments(); struct timeval timeout = { .tv_sec = 600, .tv_usec = 0 }; evtimer_add(timer_10m, &timeout); @@ -3222,15 +3292,16 @@ trusted_on_read(struct bufferevent *bev, void *ctx) unsigned char tnt[9]; size_t len; + pthread_mutex_lock(&mutex_clients); client_find(bev, &client); if (!client) - return; + goto unlock; if (!client->downstream) { /* should never happen; sanity check */ log_trace("Only trusted downstreams allowed"); client_clear(bev); - return; + goto unlock; } input = bufferevent_get_input(bev); @@ -3243,7 +3314,7 @@ trusted_on_read(struct bufferevent *bev, void *ctx) log_error("Bad message from downstream"); evbuffer_drain(input, len); client_clear(bev); - return; + goto unlock; } evbuffer_copyout(input, tnt, 9); @@ -3257,7 +3328,7 @@ trusted_on_read(struct bufferevent *bev, void *ctx) break; case BIN_CONNECT: if (len - 9 < sizeof(uint32_t)) - return; + goto unlock; evbuffer_drain(input, 9); trusted_on_client_connect(client); break; @@ -3267,13 +3338,13 @@ trusted_on_read(struct bufferevent *bev, void *ctx) break; case BIN_SHARE: if (len - 9 < sizeof(share_t)) - return; + goto unlock; evbuffer_drain(input, 9); trusted_on_client_share(client); break; case BIN_BLOCK: if (len - 9 < sizeof(block_t)) - return; + goto unlock; evbuffer_drain(input, 9); trusted_on_client_block(client); break; @@ -3281,9 +3352,11 @@ trusted_on_read(struct bufferevent *bev, void *ctx) log_error("Unknown message: %d", tnt[8]); evbuffer_drain(input, len); client_clear(bev); - return; + goto unlock; } } +unlock: + pthread_mutex_unlock(&mutex_clients); } static void @@ -3907,6 +3980,7 @@ cleanup(void) rx_slow_hash_free_state(); pthread_mutex_destroy(&mutex_clients); pthread_mutex_destroy(&mutex_log); + pthread_mutex_destroy(&mutex_tx); log_info("Pool shutdown successfully"); if (fd_log) fclose(fd_log);