database: add resizing

This commit is contained in:
Jethro Grassie 2019-08-18 13:49:02 -04:00
parent 9dd062d8c5
commit 37f605efdc
No known key found for this signature in database
GPG Key ID: DE8ED755616565BB
1 changed files with 98 additions and 24 deletions

View File

@ -71,7 +71,7 @@ developers.
#include "forkoff.h"
#define MAX_LINE 8192
#define POOL_CLIENTS_GROW 1024
#define POOL_CLIENTS_GROW 0x4000
#define RPC_BODY_MAX 8192
#define JOB_BODY_MAX 8192
#define ERROR_BODY_MAX 512
@ -82,7 +82,8 @@ developers.
#define MAINNET_ADDRESS_PREFIX 18
#define TESTNET_ADDRESS_PREFIX 53
#define BLOCK_HEADERS_RANGE 10
#define DB_SIZE 0x140000000 /* 5G */
#define DB_INIT_SIZE 0x140000000 /* 5G */
#define DB_GROW_SIZE 0xA0000000 /* 2.5G */
#define DB_COUNT_MAX 10
#define MAX_PATH 1024
#define RPC_PATH "/json_rpc"
@ -272,6 +273,7 @@ static BIGNUM *base_diff;
static pool_stats_t pool_stats;
static pthread_mutex_t mutex_clients = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t mutex_log = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t mutex_tx = PTHREAD_MUTEX_INITIALIZER;
static FILE *fd_log;
static unsigned char sec_view[32];
static unsigned char pub_spend[32];
@ -372,6 +374,62 @@ compare_payment(const MDB_val *a, const MDB_val *b)
return (va->timestamp < vb->timestamp) ? -1 : 1;
}
static int
database_resize(void)
{
const double threshold = 0.9;
MDB_envinfo ei;
MDB_stat st;
int rc;
char *err;
mdb_env_info(env, &ei);
mdb_env_stat(env, &st);
if(ei.me_mapsize < DB_INIT_SIZE)
{
if ((rc = pthread_mutex_trylock(&mutex_tx)))
{
log_warn("Cannot cannot acquire lock");
return rc;
}
if ((rc = mdb_env_set_mapsize(env, DB_INIT_SIZE)) != 0)
{
err = mdb_strerror(rc);
log_error("%s", err);
goto unlock;
}
log_debug("Database initial size: %"PRIu64, DB_INIT_SIZE);
goto unlock;
}
uint64_t used = st.ms_psize * ei.me_last_pgno;
uint64_t remaining = (uint64_t) ei.me_mapsize - used;
log_debug("Database (used/free): %"PRIu64"/%"PRIu64, used, remaining);
if ((double)used / ei.me_mapsize > threshold)
{
uint64_t ns = (uint64_t) ei.me_mapsize + DB_GROW_SIZE;
if ((rc = pthread_mutex_trylock(&mutex_tx)))
{
log_warn("Cannot cannot acquire lock");
return rc;
}
if ((rc = mdb_env_set_mapsize(env, ns)) != 0)
{
err = mdb_strerror(rc);
log_error("%s", err);
goto unlock;
}
log_info("Database resized to: %"PRIu64, ns);
goto unlock;
}
return 0;
unlock:
pthread_mutex_unlock(&mutex_tx);
return rc;
}
static int
database_init(const char* data_dir)
{
@ -381,13 +439,17 @@ database_init(const char* data_dir)
rc = mdb_env_create(&env);
mdb_env_set_maxdbs(env, (MDB_dbi) DB_COUNT_MAX);
mdb_env_set_mapsize(env, DB_SIZE);
if ((rc = mdb_env_open(env, data_dir, 0, 0664)) != 0)
{
err = mdb_strerror(rc);
log_fatal("%s (%s)", err, data_dir);
exit(rc);
}
if ((rc = database_resize()))
{
log_fatal("Cannot resize DB");
exit(rc);
}
if ((rc = mdb_txn_begin(env, NULL, 0, &txn)) != 0)
{
err = mdb_strerror(rc);
@ -536,13 +598,12 @@ store_block(uint64_t height, block_t *block)
uint64_t
miner_hr(const char *address)
{
pthread_mutex_lock(&mutex_clients);
client_t *c = pool_clients.clients;
uint64_t hr = 0;
for (size_t i = 0; i < pool_clients.count; i++, c++)
{
if (c->connected_since != 0
&& strncmp(c->address, address, ADDRESS_MAX) == 0)
if (c->connected_since
&& !strncmp(c->address, address, ADDRESS_MAX))
{
double d = difftime(time(NULL), c->connected_since);
if (d == 0.0)
@ -551,36 +612,39 @@ miner_hr(const char *address)
continue;
}
}
pthread_mutex_unlock(&mutex_clients);
return hr;
}
uint64_t
miner_balance(const char *address)
{
if (strlen(address) > ADDRESS_MAX)
return 0;
int rc;
char *err;
MDB_txn *txn;
MDB_cursor *cursor;
MDB_txn *txn = NULL;
MDB_cursor *cursor = NULL;
uint64_t balance = 0;
if (strlen(address) > ADDRESS_MAX)
return balance;
pthread_mutex_trylock(&mutex_tx);
if ((rc = mdb_txn_begin(env, NULL, MDB_RDONLY, &txn)) != 0)
{
err = mdb_strerror(rc);
log_error("%s", err);
return 0;
goto cleanup;
}
if ((rc = mdb_cursor_open(txn, db_balance, &cursor)) != 0)
{
err = mdb_strerror(rc);
log_error("%s", err);
mdb_txn_abort(txn);
return 0;
goto cleanup;
}
MDB_val key = {ADDRESS_MAX, (void*)address};
MDB_val val;
uint64_t balance = 0;
rc = mdb_cursor_get(cursor, &key, &val, MDB_SET);
if (rc != 0 && rc != MDB_NOTFOUND)
@ -595,8 +659,11 @@ miner_balance(const char *address)
balance = *(uint64_t*)val.mv_data;
cleanup:
mdb_cursor_close(cursor);
mdb_txn_abort(txn);
pthread_mutex_unlock(&mutex_tx);
if (cursor)
mdb_cursor_close(cursor);
if (txn)
mdb_txn_abort(txn);
return balance;
}
@ -849,7 +916,7 @@ update_pool_hr(void)
client_t *c = pool_clients.clients;
for (size_t i = 0; i < pool_clients.count; i++, c++)
{
if (c->connected_since != 0)
if (c->connected_since)
{
double d = difftime(time(NULL), c->connected_since);
if (d == 0.0)
@ -857,6 +924,7 @@ update_pool_hr(void)
hr += c->hashes / d;
}
}
log_debug("Pool hashrate: %"PRIu64, hr);
pool_stats.pool_hashrate = hr;
}
@ -2583,6 +2651,8 @@ timer_on_120s(int fd, short kind, void *ctx)
static void
timer_on_10m(int fd, short kind, void *ctx)
{
if (database_resize())
log_warn("DB resize needed, will retry later");
send_payments();
struct timeval timeout = { .tv_sec = 600, .tv_usec = 0 };
evtimer_add(timer_10m, &timeout);
@ -3222,15 +3292,16 @@ trusted_on_read(struct bufferevent *bev, void *ctx)
unsigned char tnt[9];
size_t len;
pthread_mutex_lock(&mutex_clients);
client_find(bev, &client);
if (!client)
return;
goto unlock;
if (!client->downstream)
{
/* should never happen; sanity check */
log_trace("Only trusted downstreams allowed");
client_clear(bev);
return;
goto unlock;
}
input = bufferevent_get_input(bev);
@ -3243,7 +3314,7 @@ trusted_on_read(struct bufferevent *bev, void *ctx)
log_error("Bad message from downstream");
evbuffer_drain(input, len);
client_clear(bev);
return;
goto unlock;
}
evbuffer_copyout(input, tnt, 9);
@ -3257,7 +3328,7 @@ trusted_on_read(struct bufferevent *bev, void *ctx)
break;
case BIN_CONNECT:
if (len - 9 < sizeof(uint32_t))
return;
goto unlock;
evbuffer_drain(input, 9);
trusted_on_client_connect(client);
break;
@ -3267,13 +3338,13 @@ trusted_on_read(struct bufferevent *bev, void *ctx)
break;
case BIN_SHARE:
if (len - 9 < sizeof(share_t))
return;
goto unlock;
evbuffer_drain(input, 9);
trusted_on_client_share(client);
break;
case BIN_BLOCK:
if (len - 9 < sizeof(block_t))
return;
goto unlock;
evbuffer_drain(input, 9);
trusted_on_client_block(client);
break;
@ -3281,9 +3352,11 @@ trusted_on_read(struct bufferevent *bev, void *ctx)
log_error("Unknown message: %d", tnt[8]);
evbuffer_drain(input, len);
client_clear(bev);
return;
goto unlock;
}
}
unlock:
pthread_mutex_unlock(&mutex_clients);
}
static void
@ -3907,6 +3980,7 @@ cleanup(void)
rx_slow_hash_free_state();
pthread_mutex_destroy(&mutex_clients);
pthread_mutex_destroy(&mutex_log);
pthread_mutex_destroy(&mutex_tx);
log_info("Pool shutdown successfully");
if (fd_log)
fclose(fd_log);