From 97c3993cf7dbd069dbcaa1ec1eaa675fe8c4c652 Mon Sep 17 00:00:00 2001 From: Jethro Grassie Date: Sun, 19 Apr 2020 01:48:15 -0400 Subject: [PATCH] feature: interconnected pools --- README.md | 115 +++- pool.conf | 6 + src/pool.c | 1128 +++++++++++++++++++++++++++++++++------ src/webui.c | 2 +- src/webui.h | 6 +- stratum-ss.md => sss.md | 0 6 files changed, 1079 insertions(+), 178 deletions(-) rename stratum-ss.md => sss.md (100%) diff --git a/README.md b/README.md index c889431..216ea9b 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,10 @@ this thanks to the efficiency of both LMDB and libevent (for the stratum clients) and some sensible proxying/caching being placed in front of the [web UI](#web-ui). +Configuration is extremely flexible, now allowing for the pool to run in a +variety of setups, such as highly available and redundant configurations. +Discussed further below in: [Interconnected pools](#Interconnected-pools). + This pool was the *first* pool to support RandomX and is currently the *only* pool which supports the RandomX fast/full-memory mode. @@ -18,7 +22,7 @@ focussed on performance, efficiency and stability. The pool also supports an optional method of mining whereby miners select their *own* block template to mine on. Further information can be found in the -document: [Stratum mode self-select](./stratum-ss.md). +document: [Stratum mode self-select](./sss.md). For testing, a reference mainnet pool can be found at [monerop.com](http://monerop.com). @@ -83,15 +87,14 @@ should all be self explanatory. There are also some [command-line parameters](#command-line-parameters) which can be used to override some of these settings. -#### Block notification +### Block notification -There is one configuration option that deserves a special mention. - -You can optionally start the pool with the flag `--block-notified` (or set in +The pool can optionally be started with the flag `--block-notified` (or set in the config file: `block-notified = 1`). This will prevent the pool from -*polling* for new blocks using a timer, and instead, fetch a new block template -when it receives a *signal* (specifically, *SIGUSR1*). Now whenever you start -`monerod`, you'll make use of its `--block-notify` option. +*polling* for new blocks (using a timer), and instead fetch a new block template +when it receives a *signal* (specifically, *SIGUSR1*). The Monero daemon, +`monerod`, has a feature whereby it can execute a command whenever a block as +added to the chain, which can thus be used to generate the required signal. E.g. @@ -99,13 +102,79 @@ E.g. monerod ... --block-notify '/usr/bin/pkill -USR1 monero-pool' -This instructs `monerod` to send the required signal, *SIGUSR1*, to your pool -whenever a new block is added to the chain. +Launching `monerod` like this instructs it to send the required signal, +*SIGUSR1*, to the pool whenever a new block is added to the chain. -Using this mechanism has a *significant* benefit - your pool *immediatley* knows +Using this mechanism has a *significant* benefit - your pool *immediately* knows when to fetch a new block template to send to your miners. You're essentially giving your miners a head-start over miners in pools which use polling (which is -what all the other pool implementations do). +what currently all the other pool implementations do). + +### Interconnected pools + +In some situations it's desirable to run multiple pool instances that behave as +one. Some examples being: + + - When running a global public pool, it's desirable to offer extremely + low-latency pool connections to geographically dispersed miners. + - When running a private pool across multiple data centers, it's desirable to + make use of the internal site-to-site network makeup. + - Where there are multiple hosts available, it's desirable to offer extra + redundancy. + +To meet these needs, multiple instances of the pool can be run with each +behaving either as an edge pool, an upstream pool, both (i.e. bridged) or a +normal single pool. + +Any pool that has an upstream pool configured does almost everything a normal +pool does, with the exception that it offloads payout processing to its upstream +pool, thus it relays validated shares and blocks to the upstream pool. In +return, the upstream pool sends the combined pools stats, balance updates and +handles the payout processing. Should an upstream become unreachable, the +downstream pools continue as normal, then upon reconnection to the upstream, +sends over the backlog of shares and blocks accumulated whilst the upstream was +unreachable. + +Configuration is fairly trivial. A pool that will allow downstream pools to +connect to it, does so via the config file parameters `trusted-listen`, +`trusted-port` and `trusted-allowed`. E.g. + + trusted-listen = 10.0.0.1 + trusted-port = 4244 + trusted-allowed = 10.0.0.2,10.0.0.3 + +As share validation is performed on the edge pools, it's ***vitally*** important +this trusted listener is secured. Ideally it's only bound to an internal / local +network / private interface and specifying the IP addresses of the downstream +pools allowed to connect to it (as in the example above). If the interface being +bound to is already secured, the parameter `trusted-allowed` can be omitted. + +Then the downstream pools (`10.0.0.2` and `10.0.0.3` in the above example), need +to include in their config files the parameters `upstream-host` and +`upstream-port`. E.g. + + upstream-host = 10.0.0.1 + upstream-port = 4244 + +To create a bridged pool, use all five parameters discussed above. For example: + + trusted-listen = 10.0.0.4 + trusted-port = 4244 + trusted-allowed = 10.0.0.5,10.0.0.6 + upstream-host = 10.0.0.1 + upstream-port = 4244 + +An example where bridging can be useful is for spanning network providers, e.g. +using a global provider for the main pool hubs (the bridges) and local providers +for edge pools within a territory. + +Every pool, however configured, still needs RPC access to a Monero daemon. They +can of course all be configured to use the *same* daemon, or for extra +redundancy, make use of separate daemons. Downstream pools do not need RPC +access to the pool's wallet, only the final upstream needs wallet access. If +Stratum mode self-select is being offered, the pool wallet view key can be set +in the downstream pool config files via the `pool-view-key` parameter, or by +running a local view-only wallet RPC. ## Running @@ -132,15 +201,21 @@ command-line parameters: ## Web UI -There is a minimal web UI that gets served on the port specified in the config -file. If you plan on running a *public* pool, it's advisable to use either -Apache or Nginx as a proxy in front of this with some appropriate caching -configured. The goal is to offload browser based traffic to something built for -the task and allow the pool to focus on its primary function - serving miners. +This project is not designed to be a one-stop solution for running a public +pool; it is an highly efficient mining pool implementation. For a public pool, +which typically entails having a fancy web UI, that part is down to you. There +is howeveer a minimal web UI that gets served on the port specified in the +config file. If you plan on running a *public* pool via this UI (or any other +for that matter), it's advisable to use either Apache or Nginx as a proxy in + front of this with some appropriate caching configured. The goal is to + offload browser based traffic to something built for the task and allow the + pool to focus on its primary function - serving miners. -If you intend to make changes to the web UI, note that the HTML gets compiled -into the pool binary. The single web page that gets served simply makes use of a -JSON endpoint to populate the stats. +If you intend to make changes to this minimal web UI, note that the HTML gets +compiled into the pool binary. The single web page that gets served simply makes +use of a JSON endpoint to populate the stats. Thus, a sensible option for your +own web UI is to simply make use of that endpoint (for stats and balances), and +keep your website completely separate, served by Apache or Nginx for example. ## SSL diff --git a/pool.conf b/pool.conf index d44d92b..1462d85 100644 --- a/pool.conf +++ b/pool.conf @@ -21,3 +21,9 @@ disable-self-select = 0 data-dir = ./data pid-file = forked = 0 +# trusted-listen = 127.0.0.1 +# trusted-port = 4244 +# trusted-allowed = 127.0.0.1,127.0.0.2 +# upstream-host = 127.0.0.1 +# upstream-port = 4244 +# pool-view-key = diff --git a/src/pool.c b/src/pool.c index c1ba313..5bddec6 100644 --- a/src/pool.c +++ b/src/pool.c @@ -75,7 +75,7 @@ developers. #define RPC_BODY_MAX 8192 #define JOB_BODY_MAX 8192 #define ERROR_BODY_MAX 512 -#define STATUS_BODY_MAX 256 +#define STATUS_BODY_MAX 512 #define CLIENT_JOBS_MAX 4 #define BLOCK_HEADERS_MAX 4 #define BLOCK_TEMPLATES_MAX 4 @@ -91,6 +91,8 @@ developers. #define HR_BLOCK_COUNT 5 #define TEMLATE_HEIGHT_VARIANCE 5 #define MAX_BAD_SHARES 5 +#define MAX_DOWNSTREAM 8 +#define MAX_HOST 256 #define uint128_t unsigned __int128 @@ -118,18 +120,25 @@ developers. Payments -------- wallet addr <-> payment_t + + Properties + ---------- + name <-> value */ -enum block_status { BLOCK_LOCKED=0, BLOCK_UNLOCKED=1, BLOCK_ORPHANED=2 }; -enum stratum_mode { MODE_NORMAL=0, MODE_SELF_SELECT=1 }; +enum block_status { BLOCK_LOCKED, BLOCK_UNLOCKED, BLOCK_ORPHANED }; +enum stratum_mode { MODE_NORMAL, MODE_SELF_SELECT }; +enum msgbin_type { BIN_PING, BIN_CONNECT, BIN_DISCONNECT, BIN_SHARE, + BIN_BLOCK, BIN_STATS, BIN_BALANCE }; +const unsigned char msgbin[] = {0x4D,0x4E,0x52,0x4F,0x50,0x4F,0x4F,0x4C}; typedef struct config_t { - char rpc_host[256]; - uint32_t rpc_port; + char rpc_host[MAX_HOST]; + uint16_t rpc_port; uint32_t rpc_timeout; - char wallet_rpc_host[256]; - uint32_t wallet_rpc_port; + char wallet_rpc_host[MAX_HOST]; + uint16_t wallet_rpc_port; char pool_wallet[ADDRESS_MAX]; uint64_t pool_start_diff; double share_mul; @@ -137,17 +146,23 @@ typedef struct config_t double retarget_ratio; double pool_fee; double payment_threshold; - char pool_listen[256]; - uint32_t pool_port; - uint32_t pool_ssl_port; + char pool_listen[MAX_HOST]; + uint16_t pool_port; + uint16_t pool_ssl_port; uint32_t log_level; - uint32_t webui_port; + uint16_t webui_port; char log_file[MAX_PATH]; bool block_notified; bool disable_self_select; char data_dir[MAX_PATH]; char pid_file[MAX_PATH]; bool forked; + char trusted_listen[MAX_HOST]; + uint16_t trusted_port; + char trusted_allowed[MAX_DOWNSTREAM][MAX_HOST]; + char upstream_host[MAX_HOST]; + uint16_t upstream_port; + char pool_view_key[64]; } config_t; typedef struct block_template_t @@ -189,6 +204,8 @@ typedef struct client_t bool is_xnp; uint32_t mode; uint8_t bad_shares; + bool downstream; + uint32_t downstream_miners; } client_t; typedef struct pool_clients_t @@ -235,8 +252,9 @@ static config_t config; static pool_clients_t pool_clients; static bstack_t *bst; static bstack_t *bsh; -static struct event_base *base; +static struct event_base *pool_base; static struct event *listener_event; +static struct event *timer_30s; static struct event *timer_120s; static struct event *timer_10m; static struct event *signal_usr1; @@ -248,6 +266,7 @@ static MDB_dbi db_shares; static MDB_dbi db_blocks; static MDB_dbi db_balance; static MDB_dbi db_payments; +static MDB_dbi db_properties; static BN_CTX *bn_ctx; static BIGNUM *base_diff; static pool_stats_t pool_stats; @@ -256,6 +275,13 @@ static FILE *fd_log; static unsigned char sec_view[32]; static unsigned char pub_spend[32]; static uint8_t nettype; +static pthread_t trusted_th; +static struct event_base *trusted_base; +static struct event *trusted_event; +static struct bufferevent *upstream_event; +static struct event *timer_10s; +static time_t upstream_last_time; +static uint64_t upstream_last_height; #ifdef HAVE_RX extern void rx_stop_mining(); @@ -326,11 +352,7 @@ compare_block(const MDB_val *a, const MDB_val *b) { const block_t *va = (const block_t*) a->mv_data; const block_t *vb = (const block_t*) b->mv_data; - int sc = memcmp(va->hash, vb->hash, 64); - if (sc == 0) - return (va->timestamp < vb->timestamp) ? -1 : 1; - else - return sc; + return (va->timestamp < vb->timestamp) ? -1 : 1; } static int @@ -398,16 +420,28 @@ database_init(const char* data_dir) log_fatal("%s", err); exit(rc); } + if ((rc = mdb_dbi_open(txn, "properties", flags, &db_properties)) != 0) + { + err = mdb_strerror(rc); + log_fatal("%s", err); + exit(rc); + } + MDB_val k, v; + k.mv_data = "upstream_last_height"; + k.mv_size = strlen(k.mv_data); + if (!mdb_get(txn, db_properties, &k, &v)) + memcpy(&upstream_last_height, v.mv_data, v.mv_size); + k.mv_data = "upstream_last_time"; + k.mv_size = strlen(k.mv_data); + if (!mdb_get(txn, db_properties, &k, &v)) + memcpy(&upstream_last_time, v.mv_data, v.mv_size); mdb_set_compare(txn, db_shares, compare_uint64); mdb_set_dupsort(txn, db_shares, compare_share); - mdb_set_compare(txn, db_blocks, compare_uint64); mdb_set_dupsort(txn, db_blocks, compare_block); - mdb_set_compare(txn, db_payments, compare_string); mdb_set_dupsort(txn, db_payments, compare_payment); - mdb_set_compare(txn, db_balance, compare_string); rc = mdb_txn_commit(txn); @@ -422,6 +456,7 @@ database_close(void) mdb_dbi_close(env, db_blocks); mdb_dbi_close(env, db_balance); mdb_dbi_close(env, db_payments); + mdb_dbi_close(env, db_properties); mdb_env_close(env); } @@ -631,7 +666,7 @@ payout_block(block_t *block, MDB_txn *parent) /* PPLNS */ - log_info("Payout on block at height %"PRIu64, block->height); + log_info("Payout on block at height: %"PRIu64, block->height); int rc; char *err; MDB_txn *txn; @@ -678,7 +713,7 @@ payout_block(block_t *block, MDB_txn *parent) share_t *share = (share_t*)val.mv_data; uint64_t amount = floor((double)share->difficulty / - ((double)block->difficulty * config.share_mul) * block->reward); + ((double)block->difficulty * config.share_mul) * block->reward); if (total_paid + amount > block->reward) amount = block->reward - total_paid; total_paid += amount; @@ -742,12 +777,12 @@ process_blocks(block_t *blocks, size_t count) op = MDB_NEXT_DUP; if (rc == MDB_NOTFOUND || rc != 0) { - log_trace("No stored block at height %"PRIu64, ib->height); + log_trace("No stored block at height: %"PRIu64, ib->height); if (rc != MDB_NOTFOUND) { err = mdb_strerror(rc); - log_debug("No stored block at height %"PRIu64 - " with error: %d", + log_debug("No stored block at height: %"PRIu64 + ", with error: %d", ib->height, err); } break; @@ -763,7 +798,7 @@ process_blocks(block_t *blocks, size_t count) { log_trace("Orphaning because hashes differ: %.64s, %.64s", ib->hash, sb->hash); - log_debug("Orphaned block at height %"PRIu64, ib->height); + log_debug("Orphaned block at height: %"PRIu64, ib->height); nb.status |= BLOCK_ORPHANED; MDB_val new_val = {sizeof(block_t), (void*)&nb}; mdb_cursor_put(cursor, &key, &new_val, MDB_CURRENT); @@ -781,7 +816,7 @@ process_blocks(block_t *blocks, size_t count) } if (ib->status & BLOCK_ORPHANED) { - log_debug("Orphaned block at height %"PRIu64, ib->height); + log_debug("Orphaned block at height: %"PRIu64, ib->height); nb.status |= BLOCK_ORPHANED; MDB_val new_val = {sizeof(block_t), (void*)&nb}; mdb_cursor_put(cursor, &key, &new_val, MDB_CURRENT); @@ -789,10 +824,11 @@ process_blocks(block_t *blocks, size_t count) } nb.status |= BLOCK_UNLOCKED; nb.reward = ib->reward; - rc = payout_block(&nb, txn); - if (rc == 0) + if (!*config.upstream_host) + rc = payout_block(&nb, txn); + if (*config.upstream_host || rc == 0) { - log_debug("Paid out block %"PRIu64, nb.height); + log_debug("Paid out block: %"PRIu64, nb.height); MDB_val new_val = {sizeof(block_t), (void*)&nb}; mdb_cursor_put(cursor, &key, &new_val, MDB_CURRENT); } @@ -865,7 +901,7 @@ retarget(client_t *client, job_t *job) { uint64_t target = client_target(client, job); job->target = target; - log_debug("Client %.32s target now %"PRIu64, client->client_id, target); + log_debug("Miner %.32s target now: %"PRIu64, client->client_id, target); } static void @@ -882,7 +918,7 @@ target_to_hex(uint64_t target, char *target_hex) #ifdef SIXTY_FOUR_BIT_LONG BN_set_word(bnt, target); #else - char tmp[24]; + char tmp[24] = {0}; snprintf(tmp, 24, "%"PRIu64, target); BN_dec2bn(&bnt, tmp); #endif @@ -965,7 +1001,7 @@ stratum_get_job_body_ss(char *body, const client_t *client, bool response) seed_hash = job->miner_template->seed_hash; next_seed_hash = job->miner_template->next_seed_hash; } - unsigned char extra_bin[8]; + unsigned char extra_bin[8] = {0}; memcpy(extra_bin, &job->extra_nonce, 4); memcpy(extra_bin+4, &instance_id, 4); char extra_hex[17] = {0}; @@ -1057,7 +1093,7 @@ static void send_validation_error(const client_t *client, const char *message) { struct evbuffer *output = bufferevent_get_output(client->bev); - char body[ERROR_BODY_MAX]; + char body[ERROR_BODY_MAX] = {0}; stratum_get_error_body(body, client->json_id, message); evbuffer_add(output, body, strlen(body)); log_debug("Validation error: %s", message); @@ -1109,7 +1145,7 @@ client_find_job(client_t *client, const char *job_id) } static void -client_send_job(client_t *client, bool response) +miner_send_job(client_t *client, bool response) { /* First cycle jobs */ job_t *last = &client->active_jobs[CLIENT_JOBS_MAX-1]; @@ -1153,9 +1189,9 @@ client_send_job(client_t *client, bool response) retarget(client, job); ++extra_nonce; job->extra_nonce = extra_nonce; - char body[JOB_BODY_MAX]; + char body[JOB_BODY_MAX] = {0}; stratum_get_job_body_ss(body, client, response); - log_trace("Client job: %s", body); + log_trace("Miner job: %s", body); struct evbuffer *output = bufferevent_get_output(client->bev); evbuffer_add(output, body, strlen(body)); return; @@ -1213,7 +1249,7 @@ client_send_job(client_t *client, bool response) /* Retarget */ retarget(client, job); - char body[JOB_BODY_MAX]; + char body[JOB_BODY_MAX] = {0}; if (!client->is_xnp) { stratum_get_job_body(body, client, response); @@ -1225,7 +1261,7 @@ client_send_job(client_t *client, bool response) stratum_get_proxy_job_body(body, client, block_hex, response); free(block_hex); } - log_trace("Client job: %s", body); + log_trace("Miner job: %.*s", strlen(body)-1, body); struct evbuffer *output = bufferevent_get_output(client->bev); evbuffer_add(output, body, strlen(body)); free(block); @@ -1238,9 +1274,9 @@ pool_clients_send_job(void) client_t *c = pool_clients.clients; for (size_t i = 0; i < pool_clients.count; i++, c++) { - if (c->fd == 0 || c->address[0] == 0) + if (c->fd == 0 || c->address[0] == 0 || c->downstream) continue; - client_send_job(c, false); + miner_send_job(c, false); } } @@ -1356,7 +1392,8 @@ rpc_on_response(struct evhttp_request *req, void *arg) } static void -rpc_request(struct event_base *base, const char *body, rpc_callback_t *callback) +rpc_request(struct event_base *base, const char *body, + rpc_callback_t *callback) { struct evhttp_connection *con; struct evhttp_request *req; @@ -1428,7 +1465,7 @@ rpc_get_request_body(char *body, const char *method, char *fmt, ...) break; case 'd': d = va_arg(args, uint64_t); - char tmp[24]; + char tmp[24] = {0}; snprintf(tmp, 24, "%"PRIu64, d); pb = stecpy(pb, tmp, end); break; @@ -1557,6 +1594,9 @@ startup_scan_round_shares() MDB_txn *txn; MDB_cursor *cursor; + if (*config.upstream_host) + return 0; + if ((rc = mdb_txn_begin(env, NULL, MDB_RDONLY, &txn)) != 0) { err = mdb_strerror(rc); @@ -1639,21 +1679,22 @@ startup_payout(uint64_t height) break; pool_stats.pool_blocks_found++; - block_t *block = (block_t*)val.mv_data; - pool_stats.last_block_found = block->timestamp; + + if (!upstream_event) + pool_stats.last_block_found = block->timestamp; if (block->height > height - 60) continue; if (block->status != BLOCK_LOCKED) continue; - char body[RPC_BODY_MAX]; + char body[RPC_BODY_MAX] = {0}; rpc_get_request_body(body, "get_block_header_by_height", "sd", "height", block->height); rpc_callback_t *cb = rpc_callback_new( rpc_on_block_header_by_height, NULL); - rpc_request(base, body, cb); + rpc_request(pool_base, body, cb); } mdb_cursor_close(cursor); @@ -1740,12 +1781,12 @@ rpc_on_last_block_header(const char* data, rpc_callback_t *callback) if (need_new_template) { log_info("Fetching new block template"); - char body[RPC_BODY_MAX]; + char body[RPC_BODY_MAX] = {0}; uint64_t reserve = 17; rpc_get_request_body(body, "get_block_template", "sssd", "wallet_address", config.pool_wallet, "reserve_size", reserve); rpc_callback_t *cb1 = rpc_callback_new(rpc_on_block_template, NULL); - rpc_request(base, body, cb1); + rpc_request(pool_base, body, cb1); uint64_t end = front->height - 60; uint64_t start = end - BLOCK_HEADERS_RANGE + 1; @@ -1753,7 +1794,7 @@ rpc_on_last_block_header(const char* data, rpc_callback_t *callback) "start_height", start, "end_height", end); rpc_callback_t *cb2 = rpc_callback_new( rpc_on_block_headers_range, NULL); - rpc_request(base, body, cb2); + rpc_request(pool_base, body, cb2); } json_object_put(root); @@ -1787,8 +1828,11 @@ rpc_on_block_submitted(const char* data, rpc_callback_t *callback) } pool_stats.pool_blocks_found++; block_t *b = (block_t*)callback->data; - pool_stats.last_block_found = b->timestamp; - pool_stats.round_hashes = 0; + if (!upstream_event) + { + pool_stats.last_block_found = b->timestamp; + pool_stats.round_hashes = 0; + } log_info("Block submitted at height: %"PRIu64, b->height); int rc = store_block(b->height, b); if (rc != 0) @@ -1918,6 +1962,8 @@ cleanup: static int send_payments(void) { + if (*config.upstream_host) + return 0; uint64_t threshold = 1000000000000 * config.payment_threshold; int rc; char *err; @@ -1960,9 +2006,9 @@ send_payments(void) if (amount < threshold) continue; - log_info("Sending payment of %"PRIu64" to %s\n", amount, address); + log_info("Sending payment: %"PRIu64", %.8s", amount, address); - strncpy(payment->address, address, ADDRESS_MAX); + strncpy(payment->address, address, ADDRESS_MAX-1); payment->amount = amount; payments_count++; @@ -2004,7 +2050,7 @@ send_payments(void) log_trace(body); rpc_callback_t *cb = rpc_callback_new( rpc_on_wallet_transferred, payments); - rpc_wallet_request(base, body, cb); + rpc_wallet_request(pool_base, body, cb); } else free(payments); @@ -2015,20 +2061,502 @@ send_payments(void) static void fetch_view_key(void) { - char body[RPC_BODY_MAX]; + if (*config.pool_view_key) + { + hex_to_bin(config.pool_view_key, 64, sec_view, 32); + log_info("Using pool view-key: %.4s", config.pool_view_key); + return; + } + if (*sec_view) + return; + char body[RPC_BODY_MAX] = {0}; rpc_get_request_body(body, "query_key", "ss", "key_type", "view_key"); rpc_callback_t *cb = rpc_callback_new(rpc_on_view_key, NULL); - rpc_wallet_request(base, body, cb); + rpc_wallet_request(pool_base, body, cb); } static void fetch_last_block_header(void) { log_info("Fetching last block header"); - char body[RPC_BODY_MAX]; + char body[RPC_BODY_MAX] = {0}; rpc_get_request_body(body, "get_last_block_header", NULL); rpc_callback_t *cb = rpc_callback_new(rpc_on_last_block_header, NULL); - rpc_request(base, body, cb); + rpc_request(pool_base, body, cb); +} + +static int +store_last_height_time() +{ + int rc; + char *err; + MDB_txn *txn; + MDB_val k, v; + if ((rc = mdb_txn_begin(env, NULL, 0, &txn))) + { + err = mdb_strerror(rc); + log_error("%s", err); + return rc; + } + k.mv_data = "upstream_last_height"; + k.mv_size = strlen(k.mv_data); + v.mv_data = &upstream_last_height; + v.mv_size = sizeof(upstream_last_height); + if ((rc = mdb_put(txn, db_properties, &k, &v, 0))) + { + err = mdb_strerror(rc); + log_error("%s", err); + mdb_txn_abort(txn); + return rc; + } + k.mv_data = "upstream_last_time"; + k.mv_size = strlen(k.mv_data); + v.mv_data = &upstream_last_time; + v.mv_size = sizeof(upstream_last_time); + if ((rc = mdb_put(txn, db_properties, &k, &v, 0))) + { + err = mdb_strerror(rc); + log_error("%s", err); + mdb_txn_abort(txn); + return rc; + } + if ((rc = mdb_txn_commit(txn))) + { + err = mdb_strerror(rc); + log_error("%s", err); + return rc; + } + return rc; +} + +static void +trusted_send_stats(client_t *client) +{ + struct evbuffer *output = bufferevent_get_output(client->bev); + size_t z = 9 + sizeof(pool_stats); + char data[z]; + int t = BIN_STATS; + memcpy(data, msgbin, 8); + memcpy(data+8, &t, 1); + memcpy(data+9, &pool_stats, z-9); + evbuffer_add(output, data, z); +} + +static void +trusted_send_balance(client_t *client, const char *address) +{ + struct evbuffer *output = bufferevent_get_output(client->bev); + size_t z = 9 + sizeof(uint64_t) + ADDRESS_MAX; + char data[z]; + int t = BIN_BALANCE; + memcpy(data, msgbin, 8); + memcpy(data+8, &t, 1); + uint64_t balance = miner_balance(address); + memcpy(data+9, &balance, sizeof(uint64_t)); + memcpy(data+9+sizeof(uint64_t), address, ADDRESS_MAX); + evbuffer_add(output, data, z); +} + +static void +upstream_send_ping() +{ + struct evbuffer *output = bufferevent_get_output(upstream_event); + char data[9]; + int t = BIN_PING; + memcpy(data, msgbin, 8); + memcpy(data+8, &t, 1); + evbuffer_add(output, data, 9); + log_trace("Sending message ping upstream"); +} + +static void +upstream_send_client_connect() +{ + struct evbuffer *output = bufferevent_get_output(upstream_event); + char data[9]; + int t = BIN_CONNECT; + memcpy(data, msgbin, 8); + memcpy(data+8, &t, 1); + evbuffer_add(output, data, 9); + log_trace("Sending message connect upstream"); +} + +static void +upstream_send_client_disconnect() +{ + struct evbuffer *output = bufferevent_get_output(upstream_event); + char data[9]; + int t = BIN_DISCONNECT; + memcpy(data, msgbin, 8); + memcpy(data+8, &t, 1); + evbuffer_add(output, data, 9); + log_trace("Sending message disconnect upstream"); +} + +static void +upstream_send_client_share(share_t *share) +{ + struct evbuffer *output = bufferevent_get_output(upstream_event); + size_t z = 9 + sizeof(share_t); + char data[z]; + int t = BIN_SHARE; + memcpy(data, msgbin, 8); + memcpy(data+8, &t, 1); + memcpy(data+9, share, z-9); + evbuffer_add(output, data, z); + bool update_req = false; + if (share->height > upstream_last_height) + { + upstream_last_height = share->height; + update_req = true; + } + if (share->timestamp > upstream_last_time) + { + upstream_last_time = share->timestamp; + update_req = true; + } + if (update_req) + store_last_height_time(); + log_trace("Sending share upstream: %"PRIu64", %"PRIu64", %"PRIu64, + share->difficulty, share->height, share->timestamp); +} + +static void +upstream_send_client_block(block_t *block) +{ + struct evbuffer *output = bufferevent_get_output(upstream_event); + size_t z = 9 + sizeof(block_t); + char data[z]; + int t = BIN_BLOCK; + memcpy(data, msgbin, 8); + memcpy(data+8, &t, 1); + memcpy(data+9, block, z-9); + evbuffer_add(output, data, z); + bool update_req = false; + if (block->height > upstream_last_height) + { + upstream_last_height = block->height; + update_req = true; + } + if (block->timestamp > upstream_last_time) + { + upstream_last_time = block->timestamp; + update_req = true; + } + if (update_req) + store_last_height_time(); + log_info("Sending block upstream: %.8s, %d, %d", + block->hash, block->height, block->timestamp); +} + +static void +upstream_send_backlog() +{ + /* + Send any unsent shares and blocks upstream. + */ + int rc; + char *err; + MDB_txn *txn; + MDB_cursor *curshr, *curblk; + if (!upstream_last_height || !upstream_last_time || !upstream_event) + return; + log_info("Sending upstream shares/blocks since: %"PRIu64", %"PRIu64, + upstream_last_height, upstream_last_time); + if ((rc = mdb_txn_begin(env, NULL, MDB_RDONLY, &txn))) + { + err = mdb_strerror(rc); + log_error("%s", err); + return; + } + if ((rc = mdb_cursor_open(txn, db_shares, &curshr))) + { + err = mdb_strerror(rc); + log_error("%s", err); + mdb_txn_abort(txn); + return; + } + if ((rc = mdb_cursor_open(txn, db_blocks, &curblk))) + { + err = mdb_strerror(rc); + log_error("%s", err); + mdb_txn_abort(txn); + return; + } + uint64_t h = upstream_last_height; + time_t t = upstream_last_time; + MDB_cursor_op op = MDB_SET; + while (1) + { + MDB_val k, v; + if (op == MDB_SET) + { + k.mv_size = sizeof(h); + k.mv_data = &h; + } + if (mdb_cursor_get(curshr, &k, &v, op)) + break; + op = MDB_NEXT; + share_t *s = (share_t*) v.mv_data; + if (s->timestamp <= t) + continue; + upstream_send_client_share(s); + } + op = MDB_SET; + while (1) + { + MDB_val k, v; + if (op == MDB_SET) + { + k.mv_size = sizeof(h); + k.mv_data = &h; + } + if (mdb_cursor_get(curblk, &k, &v, op)) + break; + op = MDB_NEXT; + block_t *b = (block_t*) v.mv_data; + if (b->timestamp <= t) + continue; + upstream_send_client_block(b); + } + mdb_cursor_close(curshr); + mdb_cursor_close(curblk); + mdb_txn_abort(txn); +} + +static void +trusted_on_client_connect(client_t *client) +{ + pool_stats.connected_miners++; + client->downstream_miners++; + log_trace("Downstream miner connected. " + "Miner count: %d, Pool hashrate: %"PRIu64, + pool_stats.connected_miners, pool_stats.pool_hashrate); + trusted_send_stats(client); + if (upstream_event) + upstream_send_client_connect(); +} + +static void +trusted_on_client_disconnect(client_t *client) +{ + pool_stats.connected_miners--; + if (client->downstream_miners) + client->downstream_miners--; + log_trace("Downstream miner disconnected. " + "Miner count: %d, Pool hashrate: %"PRIu64, + pool_stats.connected_miners, pool_stats.pool_hashrate); + trusted_send_stats(client); + if (upstream_event) + upstream_send_client_disconnect(); +} + +static void +trusted_on_client_share(client_t *client) +{ + /* + Downstream validated, so just store for payouts. + */ + struct evbuffer *input = bufferevent_get_input(client->bev); + share_t s; + int rc; + evbuffer_remove(input, (void*)&s, sizeof(share_t)); + log_debug("Received share from downstream with difficulty: %"PRIu64, + s.difficulty); + client->hashes += s.difficulty; + pool_stats.round_hashes += s.difficulty; + rc = store_share(s.height, &s); + if (rc != 0) + log_warn("Failed to store share: %s", mdb_strerror(rc)); + trusted_send_stats(client); + trusted_send_balance(client, s.address); + if (upstream_event) + upstream_send_client_share(&s); +} + +static void +trusted_on_client_block(client_t *client) +{ + struct evbuffer *input = bufferevent_get_input(client->bev); + block_t b; + int rc; + evbuffer_remove(input, (void*)&b, sizeof(block_t)); + pool_stats.pool_blocks_found++; + pool_stats.last_block_found = b.timestamp; + pool_stats.round_hashes = 0; + log_info("Block submitted by downstream: %.8s, %"PRIu64, b.hash, b.height); + rc = store_block(b.height, &b); + if (rc != 0) + log_warn("Failed to store block: %s", mdb_strerror(rc)); + trusted_send_stats(client); + if (upstream_event) + upstream_send_client_block(&b); +} + +static void +upstream_on_stats(struct bufferevent *bev) +{ + struct evbuffer *input = bufferevent_get_input(bev); + evbuffer_remove(input, &pool_stats, sizeof(pool_stats_t)); + log_trace("Stats from upstream: " + "%d, %"PRIu64", %"PRIu64", %d, %"PRIu64, + pool_stats.connected_miners, + pool_stats.pool_hashrate, + pool_stats.round_hashes, + pool_stats.pool_blocks_found, + pool_stats.last_block_found); +} + +static int +upstream_on_balance(struct bufferevent *bev) +{ + int rc; + char *err; + MDB_txn *txn; + uint64_t balance; + char address[ADDRESS_MAX]; + struct evbuffer *input = bufferevent_get_input(bev); + evbuffer_remove(input, &balance, sizeof(uint64_t)); + evbuffer_remove(input, address, ADDRESS_MAX); + log_trace("Balance from upstream: %.8s, %"PRIu64, address, balance); + if ((rc = mdb_txn_begin(env, NULL, 0, &txn)) != 0) + { + err = mdb_strerror(rc); + log_error("%s", err); + return rc; + } + MDB_val k = {ADDRESS_MAX, (void*)address}; + MDB_val v = {sizeof(uint64_t), (void*)&balance}; + if ((rc = mdb_put(txn, db_balance, &k, &v, 0))) + { + err = mdb_strerror(rc); + log_error("%s", err); + mdb_txn_abort(txn); + return rc; + } + rc = mdb_txn_commit(txn); + return rc; +} + +static void +upstream_on_read(struct bufferevent *bev, void *ctx) +{ + struct evbuffer *input = bufferevent_get_input(bev); + struct evbuffer_ptr tag; + unsigned char tnt[9]; + size_t len; + + input = bufferevent_get_input(bev); + while ((len = evbuffer_get_length(input)) >= 9) + { + tag = evbuffer_search(input, (const char*) msgbin, 8, NULL); + if (tag.pos < 0) + { + log_error("Bad message from upstream"); + evbuffer_drain(input, len); + return; + } + + evbuffer_remove(input, tnt, 9); + + + switch (tnt[8]) + { + case BIN_STATS: + upstream_on_stats(bev); + break; + case BIN_BALANCE: + upstream_on_balance(bev); + break; + default: + log_error("Unsupported message type: %d", tnt[8]); + evbuffer_drain(input, evbuffer_get_length(input)); + return; + } + } +} + +static void +upstream_on_event(struct bufferevent *bev, short error, void *ctx) +{ + if (error & BEV_EVENT_CONNECTED) + { + log_info("Connected to upstream: %s:%d", + config.upstream_host, config.upstream_port); + upstream_send_backlog(); + return; + } + if (error & BEV_EVENT_EOF) + { + log_debug("Upstream disconnected"); + } + else if (error & BEV_EVENT_ERROR) + { + log_debug("Upstream connection error: %d", errno); + } + else if (error & BEV_EVENT_TIMEOUT) + { + log_debug("Upstream timeout"); + } + /* Wait and try to reconnect */ + if (upstream_event) + { + bufferevent_free(upstream_event); + upstream_event = NULL; + } + log_warn("No connection to upstream; retrying in 10s"); + struct timeval timeout = { .tv_sec = 10, .tv_usec = 0 }; + evtimer_add(timer_10s, &timeout); +} + +static void +upstream_connect() +{ + struct addrinfo *info; + int rc; + char port[6] = {0}; + + sprintf(port, "%d", config.upstream_port); + if ((rc = getaddrinfo(config.upstream_host, port, 0, &info))) + { + log_fatal("Error parsing upstream host: %s", gai_strerror(rc)); + return; + } + + upstream_event = bufferevent_socket_new(pool_base, -1, + BEV_OPT_CLOSE_ON_FREE); + + if (bufferevent_socket_connect(upstream_event, + info->ai_addr, info->ai_addrlen) < 0) + { + perror("connect"); + goto bail; + } + + bufferevent_setcb(upstream_event, + upstream_on_read, NULL, upstream_on_event, NULL); + bufferevent_enable(upstream_event, EV_READ|EV_WRITE); + evutil_make_socket_nonblocking(bufferevent_getfd(upstream_event)); + +bail: + freeaddrinfo(info); +} + +static void +timer_on_10s(int fd, short kind, void *ctx) +{ + log_info("Reconnecting to upstream: %s:%d", + config.upstream_host, config.upstream_port); + upstream_connect(); +} + +static void +timer_on_30s(int fd, short kind, void *ctx) +{ + if (upstream_event) + upstream_send_ping(); + struct timeval timeout = { .tv_sec = 30, .tv_usec = 0 }; + evtimer_add(timer_30s, &timeout); } static void @@ -2049,9 +2577,8 @@ timer_on_10m(int fd, short kind, void *ctx) } static void -client_add(int fd, struct bufferevent *bev) +client_add(int fd, struct bufferevent *bev, bool downstream) { - log_info("New client connected"); client_t *c = pool_clients.clients; bool resize = true; for (size_t i = 0; i < pool_clients.count; i++, c++) @@ -2079,7 +2606,11 @@ client_add(int fd, struct bufferevent *bev) c->fd = fd; c->bev = bev; c->connected_since = time(NULL); - pool_stats.connected_miners++; + c->downstream = downstream; + if (!downstream) + pool_stats.connected_miners++; + if (upstream_event) + upstream_send_client_connect(); } static void @@ -2111,13 +2642,18 @@ client_clear(struct bufferevent *bev) if (!client) return; client_clear_jobs(client); + if (client->downstream) + pool_stats.connected_miners -= client->downstream_miners; + else + pool_stats.connected_miners--; memset(client, 0, sizeof(client_t)); bufferevent_free(bev); - pool_stats.connected_miners--; + if (upstream_event) + upstream_send_client_disconnect(); } static void -client_on_login(json_object *message, client_t *client) +miner_on_login(json_object *message, client_t *client) { JSON_GET_OR_ERROR(params, message, json_type_object, client); JSON_GET_OR_ERROR(login, params, json_type_string, client); @@ -2140,7 +2676,7 @@ client_on_login(json_object *message, client_t *client) return; } client->mode = MODE_SELF_SELECT; - log_trace("Client login for mode: self-select"); + log_trace("Miner login for mode: self-select"); } } } @@ -2181,17 +2717,17 @@ client_on_login(json_object *message, client_t *client) return; } - strncpy(client->address, address, sizeof(client->address)); - strncpy(client->worker_id, worker_id, sizeof(client->worker_id)); + strncpy(client->address, address, sizeof(client->address)-1); + strncpy(client->worker_id, worker_id, sizeof(client->worker_id)-1); uuid_t cid; uuid_generate(cid); bin_to_hex((const unsigned char*)cid, sizeof(uuid_t), client->client_id, 32); - client_send_job(client, true); + miner_send_job(client, true); } static void -client_on_block_template(json_object *message, client_t *client) +miner_on_block_template(json_object *message, client_t *client) { struct evbuffer *output = bufferevent_get_output(client->bev); @@ -2214,8 +2750,9 @@ client_on_block_template(json_object *message, client_t *client) int64_t dh = llabs(h - (int64_t)pool_stats.network_height); if (dh > TEMLATE_HEIGHT_VARIANCE) { - char m[64]; - snprintf(m, 64, "Bad height. Differs to pool by %"PRIu64" blocks.", dh); + char m[64] = {0}; + snprintf(m, 64, "Bad height. " + "Differs to pool by %"PRIu64" blocks.", dh); send_validation_error(client, m); return; } @@ -2266,14 +2803,14 @@ client_on_block_template(json_object *message, client_t *client) json_object_get_string(next_seed_hash), 64); } - log_trace("Client set template: %s", btb); - char body[STATUS_BODY_MAX]; + log_trace("Miner set template: %s", btb); + char body[STATUS_BODY_MAX] = {0}; stratum_get_status_body(body, client->json_id, "OK"); evbuffer_add(output, body, strlen(body)); } static void -client_on_submit(json_object *message, client_t *client) +miner_on_submit(json_object *message, client_t *client) { struct evbuffer *output = bufferevent_get_output(client->bev); @@ -2319,7 +2856,7 @@ client_on_submit(json_object *message, client_t *client) return; } - log_trace("Client submitted nonce=%u, result=%s", + log_trace("Miner submitted nonce=%u, result=%s", result_nonce, result_hex); /* 1. Validate submission @@ -2335,8 +2872,8 @@ client_on_submit(json_object *message, client_t *client) check result hash against template difficulty (submit to network if good) add share to db - Note reserved space is: extra_nonce, instance_id, pool_nonce, worker_nonce - 4 bytes each. + Note reserved space is: extra_nonce|instance_id|pool_nonce|worker_nonce + 4 bytes each. */ /* Convert template to blob */ @@ -2400,7 +2937,7 @@ client_on_submit(json_object *message, client_t *client) { if (submissions[i] == sub) { - char body[ERROR_BODY_MAX]; + char body[ERROR_BODY_MAX] = {0}; stratum_get_error_body(body, client->json_id, "Duplicate share"); evbuffer_add(output, body, strlen(body)); log_debug("Duplicate share"); @@ -2423,7 +2960,7 @@ client_on_submit(json_object *message, client_t *client) if (get_hashing_blob(block, bin_size, &hashing_blob, &hashing_blob_size) != 0) { - char body[ERROR_BODY_MAX]; + char body[ERROR_BODY_MAX] = {0}; stratum_get_error_body(body, client->json_id, "Invalid block"); evbuffer_add(output, body, strlen(body)); log_debug("Invalid block"); @@ -2438,7 +2975,7 @@ client_on_submit(json_object *message, client_t *client) uint8_t pow_variant = major_version >= 7 ? major_version - 6 : 0; if (pow_variant >= 6) { - unsigned char seed_hash[32]; + unsigned char seed_hash[32] = {0}; hex_to_bin(bt->seed_hash, 64, seed_hash, 32); get_rx_hash(hashing_blob, hashing_blob_size, (unsigned char*)result_hash, seed_hash, bt->height); @@ -2452,7 +2989,7 @@ client_on_submit(json_object *message, client_t *client) if (memcmp(submitted_hash, result_hash, 32) != 0) { - char body[ERROR_BODY_MAX]; + char body[ERROR_BODY_MAX] = {0}; stratum_get_error_body(body, client->json_id, "Invalid share"); evbuffer_add(output, body, strlen(body)); log_debug("Invalid share"); @@ -2487,7 +3024,7 @@ client_on_submit(json_object *message, client_t *client) log_info("+++ MINED A BLOCK +++"); char *block_hex = calloc((bin_size << 1)+1, sizeof(char)); bin_to_hex(block, bin_size, block_hex, bin_size << 1); - char body[RPC_BODY_MAX]; + char body[RPC_BODY_MAX] = {0}; snprintf(body, RPC_BODY_MAX, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":" "\"submit_block\", \"params\":[\"%s\"]}", @@ -2505,14 +3042,15 @@ client_on_submit(json_object *message, client_t *client) b->difficulty = bt->difficulty; b->status = BLOCK_LOCKED; b->timestamp = now; - - rpc_request(base, body, cb); + if (upstream_event) + upstream_send_client_block(b); + rpc_request(pool_base, body, cb); free(block_hex); } else if (BN_cmp(hd, jd) < 0) { can_store = false; - char body[ERROR_BODY_MAX]; + char body[ERROR_BODY_MAX] = {0}; stratum_get_error_body(body, client->json_id, "Low difficulty share"); evbuffer_add(output, body, strlen(body)); log_debug("Low difficulty (%lu) share", BN_get_word(jd)); @@ -2532,27 +3070,30 @@ client_on_submit(json_object *message, client_t *client) share_t share; share.height = bt->height; share.difficulty = job->target; - strncpy(share.address, client->address, sizeof(share.address)); + strncpy(share.address, client->address, sizeof(share.address)-1); share.timestamp = now; - pool_stats.round_hashes += share.difficulty; + if (!upstream_event) + pool_stats.round_hashes += share.difficulty; log_debug("Storing share with difficulty: %"PRIu64, share.difficulty); int rc = store_share(share.height, &share); if (rc != 0) log_warn("Failed to store share: %s", mdb_strerror(rc)); - char body[STATUS_BODY_MAX]; + char body[STATUS_BODY_MAX] = {0}; stratum_get_status_body(body, client->json_id, "OK"); evbuffer_add(output, body, strlen(body)); + if (upstream_event) + upstream_send_client_share(&share); } if (retarget_required(client, job)) { log_debug("Sending an early job as this was less than %u%% of" " potential", (unsigned)(100.*config.retarget_ratio)); - client_send_job(client, false); + miner_send_job(client, false); } } static void -client_on_read(struct bufferevent *bev, void *ctx) +miner_on_read(struct bufferevent *bev, void *ctx) { const char *unknown_method = "Removing client. Unknown method called."; const char *too_bad = "Removing client. Too many bad shares."; @@ -2573,7 +3114,7 @@ client_on_read(struct bufferevent *bev, void *ctx) size_t len = evbuffer_get_length(input); if (len > MAX_LINE) { - char body[ERROR_BODY_MAX]; + char body[ERROR_BODY_MAX] = {0}; stratum_get_error_body(body, client->json_id, too_long); evbuffer_add(output, body, strlen(body)); log_info(too_long); @@ -2588,7 +3129,7 @@ client_on_read(struct bufferevent *bev, void *ctx) if (!message) { free(line); - char body[ERROR_BODY_MAX]; + char body[ERROR_BODY_MAX] = {0}; stratum_get_error_body(body, client->json_id, invalid_json); evbuffer_add(output, body, strlen(body)); log_info(invalid_json); @@ -2609,23 +3150,23 @@ client_on_read(struct bufferevent *bev, void *ctx) } else if (strcmp(method_name, "login") == 0) { - client_on_login(message, client); + miner_on_login(message, client); } else if (strcmp(method_name, "block_template") == 0) { - client_on_block_template(message, client); + miner_on_block_template(message, client); } else if (strcmp(method_name, "submit") == 0) { - client_on_submit(message, client); + miner_on_submit(message, client); } else if (strcmp(method_name, "getjob") == 0) { - client_send_job(client, false); + miner_send_job(client, false); } else if (strcmp(method_name, "keepalived") == 0) { - char body[STATUS_BODY_MAX]; + char body[STATUS_BODY_MAX] = {0}; stratum_get_status_body(body, client->json_id, "KEEPALIVED"); evbuffer_add(output, body, strlen(body)); } @@ -2639,7 +3180,7 @@ client_on_read(struct bufferevent *bev, void *ctx) if (unknown) { - char body[ERROR_BODY_MAX]; + char body[ERROR_BODY_MAX] = {0}; stratum_get_error_body(body, client->json_id, unknown_method); evbuffer_add(output, body, strlen(body)); log_info(unknown_method); @@ -2649,7 +3190,7 @@ client_on_read(struct bufferevent *bev, void *ctx) } if (client->bad_shares > MAX_BAD_SHARES) { - char body[ERROR_BODY_MAX]; + char body[ERROR_BODY_MAX] = {0}; stratum_get_error_body(body, client->json_id, too_bad); evbuffer_add(output, body, strlen(body)); log_info(too_bad); @@ -2661,30 +3202,92 @@ client_on_read(struct bufferevent *bev, void *ctx) } static void -client_on_error(struct bufferevent *bev, short error, void *ctx) +trusted_on_read(struct bufferevent *bev, void *ctx) { + struct evbuffer *input; + client_t *client = NULL; + struct evbuffer_ptr tag; + unsigned char tnt[9]; + size_t len; + + client_find(bev, &client); + if (!client) + return; + if (!client->downstream) + { + /* should never happen; sanity check */ + log_trace("Only trusted downstreams allowed"); + client_clear(bev); + return; + } + + input = bufferevent_get_input(bev); + + while ((len = evbuffer_get_length(input)) >= 9) + { + tag = evbuffer_search(input, (const char*) msgbin, 8, NULL); + if (tag.pos < 0) + { + log_error("Bad message from downstream"); + evbuffer_drain(input, len); + client_clear(bev); + return; + } + + evbuffer_remove(input, tnt, 9); + log_trace("Downstream message: %d", tnt[8]); + switch (tnt[8]) + { + case BIN_PING: + case BIN_STATS: + trusted_send_stats(client); + break; + case BIN_CONNECT: + trusted_on_client_connect(client); + break; + case BIN_DISCONNECT: + trusted_on_client_disconnect(client); + break; + case BIN_SHARE: + trusted_on_client_share(client); + break; + case BIN_BLOCK: + trusted_on_client_block(client); + break; + default: + log_error("Unknown message: %d", tnt[8]); + evbuffer_drain(input, len); + client_clear(bev); + return; + } + } +} + +static void +listener_on_error(struct bufferevent *bev, short error, void *ctx) +{ + struct event_base *base = (struct event_base*)ctx; + char *type = base != trusted_base ? "Miner" : "Downstream"; if (error & BEV_EVENT_EOF) { - /* connection has been closed */ - log_debug("Client disconnected. Removing."); + log_debug("%s disconnected. Removing.", type); } else if (error & BEV_EVENT_ERROR) { - /* check errno to see what error occurred */ - log_debug("Client error: %d. Removing.", errno); + log_debug("%s error: %d. Removing.", type, errno); } else if (error & BEV_EVENT_TIMEOUT) { - /* must be a timeout event handle, handle it */ - log_debug("Client timeout. Removing."); + log_debug("%s timeout. Removing.", type); } client_clear(bev); } static void -client_on_accept(evutil_socket_t listener, short event, void *arg) +listener_on_accept(evutil_socket_t listener, short event, void *arg) { struct event_base *base = (struct event_base*)arg; + char *type = base != trusted_base ? "miner" : "downstream"; struct sockaddr_storage ss; socklen_t slen = sizeof(ss); int fd = accept(listener, (struct sockaddr*)&ss, &slen); @@ -2693,12 +3296,46 @@ client_on_accept(evutil_socket_t listener, short event, void *arg) perror("accept"); return; } + if (base == trusted_base && *config.trusted_allowed[0]) + { + char *s = config.trusted_allowed[0]; + char *e = s + (MAX_DOWNSTREAM * MAX_HOST); + char host[MAX_HOST]; + bool match = false; + int rc = 0; + if ((rc = getnameinfo((struct sockaddr*)&ss, slen, + host, MAX_HOST, NULL, 0, NI_NUMERICHOST))) + { + log_error("Error parsing trusted allowed address: %s", + gai_strerror(rc)); + return; + } + while (s < e) + { + if (strncmp(s, host, MAX_HOST) == 0) + { + match = true; + break; + } + s += MAX_HOST; + } + if (!match) + { + close(fd); + log_error("Host %s not allowed as trusted downstream", host); + return; + } + } struct bufferevent *bev; evutil_make_socket_nonblocking(fd); bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE); - bufferevent_setcb(bev, client_on_read, NULL, client_on_error, NULL); + bufferevent_setcb(bev, + base == trusted_base ? trusted_on_read : miner_on_read, + NULL, listener_on_error, arg); bufferevent_setwatermark(bev, EV_READ, 0, MAX_LINE); - client_add(fd, bev); + client_add(fd, bev, base == trusted_base); + log_info("New %s connected. Miner count: %d, Pool hashrate: %"PRIu64, + type, pool_stats.connected_miners, pool_stats.pool_hashrate); bufferevent_enable(bev, EV_READ|EV_WRITE); } @@ -2727,7 +3364,7 @@ read_config(const char *config_file) char path[MAX_PATH] = {0}; if (config_file) { - strncpy(path, config_file, MAX_PATH); + strncpy(path, config_file, MAX_PATH-1); } else { @@ -2739,7 +3376,7 @@ read_config(const char *config_file) strcat(path, "/pool.conf"); if (access(path, R_OK) != 0) { - strncpy(path, getenv("HOME"), MAX_PATH); + strncpy(path, getenv("HOME"), MAX_PATH-1); strcat(path, "/pool.conf"); if (access(path, R_OK) != 0) { @@ -2749,7 +3386,7 @@ read_config(const char *config_file) } } } - log_info("Reading config at: %s", path); + log_info("Reading config from: %s", path); FILE *fp = fopen(path, "r"); if (!fp) @@ -2757,7 +3394,7 @@ read_config(const char *config_file) log_fatal("Cannot open config file. Aborting."); exit(-1); } - char line[1024]; + char line[1024] = {0}; char *key; char *val; const char *tok = " ="; @@ -2774,7 +3411,7 @@ read_config(const char *config_file) val[strcspn(val, "\r\n")] = 0; if (strcmp(key, "pool-listen") == 0) { - strncpy(config.pool_listen, val, sizeof(config.pool_listen)); + strncpy(config.pool_listen, val, sizeof(config.pool_listen)-1); } else if (strcmp(key, "pool-port") == 0) { @@ -2790,7 +3427,7 @@ read_config(const char *config_file) } else if (strcmp(key, "rpc-host") == 0) { - strncpy(config.rpc_host, val, sizeof(config.rpc_host)); + strncpy(config.rpc_host, val, sizeof(config.rpc_host)-1); } else if (strcmp(key, "rpc-port") == 0) { @@ -2798,7 +3435,7 @@ read_config(const char *config_file) } else if (strcmp(key, "wallet-rpc-host") == 0) { - strncpy(config.wallet_rpc_host, val, sizeof(config.rpc_host)); + strncpy(config.wallet_rpc_host, val, sizeof(config.rpc_host)-1); } else if (strcmp(key, "wallet-rpc-port") == 0) { @@ -2810,7 +3447,7 @@ read_config(const char *config_file) } else if (strcmp(key, "pool-wallet") == 0) { - strncpy(config.pool_wallet, val, sizeof(config.pool_wallet)); + strncpy(config.pool_wallet, val, sizeof(config.pool_wallet)-1); } else if (strcmp(key, "pool-start-diff") == 0) { @@ -2842,7 +3479,7 @@ read_config(const char *config_file) } else if (strcmp(key, "log-file") == 0) { - strncpy(config.log_file, val, sizeof(config.log_file)); + strncpy(config.log_file, val, sizeof(config.log_file)-1); } else if (strcmp(key, "block-notified") == 0) { @@ -2854,16 +3491,53 @@ read_config(const char *config_file) } else if (strcmp(key, "data-dir") == 0) { - strncpy(config.data_dir, val, sizeof(config.data_dir)); + strncpy(config.data_dir, val, sizeof(config.data_dir)-1); } else if (strcmp(key, "pid-file") == 0) { - strncpy(config.pid_file, val, sizeof(config.pid_file)); + strncpy(config.pid_file, val, sizeof(config.pid_file)-1); } else if (strcmp(key, "forked") == 0) { config.forked = atoi(val); } + else if (strcmp(key, "trusted-listen") == 0) + { + strncpy(config.trusted_listen, val, + sizeof(config.trusted_listen)-1); + } + else if (strcmp(key, "trusted-port") == 0) + { + config.trusted_port = atoi(val); + } + else if (strcmp(key, "trusted-allowed") == 0) + { + char *temp = strdup(val); + char *search = temp; + char *s = config.trusted_allowed[0]; + char *e = s + (MAX_DOWNSTREAM * MAX_HOST); + char *ip; + while ((ip = strsep(&search, " ,")) && s < e) + { + if (!strlen(ip)) + continue; + strncpy(s, ip, MAX_HOST-1); + s += MAX_HOST; + } + free(temp); + } + else if (strcmp(key, "upstream-host") == 0) + { + strncpy(config.upstream_host, val, sizeof(config.upstream_host)-1); + } + else if (strcmp(key, "upstream-port") == 0) + { + config.upstream_port = atoi(val); + } + else if (strcmp(key, "pool-view-key") == 0 && strlen(val) == 64) + { + memcpy(config.pool_view_key, val, 64); + } } fclose(fp); @@ -2886,14 +3560,44 @@ read_config(const char *config_file) if (config.retarget_ratio < 0 || config.retarget_ratio > 1) { log_fatal("Set retarget-ratio to any rational value within range " - "[0, 1]. Clients will receive new jobs earlier if their latest" + "[0, 1]. Miners will receive new jobs earlier if their latest" " work is less than retarget-ratio percentage of potential."); exit(-1); } - + if (*config.upstream_host + && strcmp(config.upstream_host, config.pool_listen) == 0 + && config.upstream_port == config.pool_port) + { + log_fatal("Cannot point upstream to the pool. Aborting."); + exit(-1); + } + if (*config.upstream_host + && strcmp(config.upstream_host, config.trusted_listen) == 0 + && config.upstream_port == config.trusted_port) + { + log_fatal("Cannot point upstream to this trusted listener. Aborting."); + exit(-1); + } } + static void print_config() { + char display_allowed[MAX_HOST*MAX_DOWNSTREAM] = {0}; + if (*config.trusted_allowed[0]) + { + char *s = display_allowed; + char *e = display_allowed + sizeof(display_allowed); + char *f = config.trusted_allowed[0]; + char *l = f + (MAX_DOWNSTREAM * MAX_HOST); + s = stecpy(s, f, e); + f += MAX_HOST; + while(*f && f < l) + { + s = stecpy(s, ",", e); + s = stecpy(s, f, e); + f += MAX_HOST; + } + } log_info("\nCONFIG:\n" " pool-listen = %s\n" " pool-port = %u\n" @@ -2917,7 +3621,12 @@ static void print_config() " disable-self-select = %u\n" " data-dir = %s\n" " pid-file = %s\n" - " forked = %u\n", + " forked = %u\n" + " trusted-listen = %s\n" + " trusted-port = %u\n" + " trusted-allowed = %s\n" + " upstream-host = %s\n" + " upstream-port = %u\n", config.pool_listen, config.pool_port, config.pool_ssl_port, @@ -2940,7 +3649,12 @@ static void print_config() config.disable_self_select, config.data_dir, config.pid_file, - config.forked); + config.forked, + config.trusted_listen, + config.trusted_port, + display_allowed, + config.upstream_host, + config.upstream_port); } static void @@ -2957,16 +3671,81 @@ sigint_handler(int sig) exit(0); } +static void * +trusted_run(void *ctx) +{ + evutil_socket_t listener; + struct addrinfo *info; + int rc; + char port[6] = {0}; + + trusted_base = event_base_new(); + if (!trusted_base) + { + log_fatal("Failed to create trusted event base"); + return 0; + } + + sprintf(port, "%d", config.trusted_port); + if ((rc = getaddrinfo(config.trusted_listen, port, 0, &info))) + { + log_fatal("Error parsing trusted listen address: %s", + gai_strerror(rc)); + return 0; + } + + listener = socket(info->ai_family, SOCK_STREAM, 0); + evutil_make_socket_nonblocking(listener); + +#ifndef WIN32 + { + int one = 1; + setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); + } +#endif + + if (bind(listener, info->ai_addr, info->ai_addrlen) < 0) + { + perror("bind"); + goto bail; + } + + freeaddrinfo(info); + info = NULL; + + if (listen(listener, 16)<0) + { + perror("listen"); + goto bail; + } + + trusted_event = event_new(trusted_base, listener, EV_READ|EV_PERSIST, + listener_on_accept, (void*)trusted_base); + if (event_add(trusted_event, NULL) != 0) + { + log_fatal("Failed to add trusted socket listener event"); + goto bail; + } + + event_base_dispatch(trusted_base); + +bail: + if (info) + freeaddrinfo(info); + event_base_free(trusted_base); + return 0; +} + static void run(void) { evutil_socket_t listener; struct addrinfo *info; int rc; - char port[6]; + char port[6] = {0}; - base = event_base_new(); - if (!base) + pool_base = event_base_new(); + if (!pool_base) { log_fatal("Failed to create event base"); return; @@ -2992,57 +3771,98 @@ run(void) if (bind(listener, info->ai_addr, info->ai_addrlen) < 0) { perror("bind"); - return; + goto bail; } + freeaddrinfo(info); + info = NULL; + if (listen(listener, 16)<0) { perror("listen"); - return; + goto bail; } - listener_event = event_new(base, listener, EV_READ|EV_PERSIST, - client_on_accept, (void*)base); + listener_event = event_new(pool_base, listener, EV_READ|EV_PERSIST, + listener_on_accept, (void*)pool_base); if (event_add(listener_event, NULL) != 0) { log_fatal("Failed to add socket listener event"); - return; + goto bail; } - signal_usr1 = evsignal_new(base, SIGUSR1, sigusr1_handler, NULL); + signal_usr1 = evsignal_new(pool_base, SIGUSR1, sigusr1_handler, NULL); event_add(signal_usr1, NULL); + + if (*config.trusted_listen && config.trusted_port) + { + log_info("Starting trusted listener on: %s:%d", + config.trusted_listen, config.trusted_port); + if (pthread_create(&trusted_th, NULL, trusted_run, NULL)) + { + log_fatal("Cannot create trusted thread"); + goto bail; + } + pthread_detach(trusted_th); + } + + if (*config.upstream_host && config.upstream_port) + { + log_info("Starting upstream connection to: %s:%d", + config.upstream_host, config.upstream_port); + upstream_connect(); + } + if (!config.block_notified) { - timer_120s = evtimer_new(base, timer_on_120s, NULL); + timer_120s = evtimer_new(pool_base, timer_on_120s, NULL); timer_on_120s(-1, EV_TIMEOUT, NULL); } else fetch_last_block_header(); - fetch_view_key(); - - timer_10m = evtimer_new(base, timer_on_10m, NULL); + timer_10m = evtimer_new(pool_base, timer_on_10m, NULL); timer_on_10m(-1, EV_TIMEOUT, NULL); + if (*config.upstream_host) + { + timer_10s = evtimer_new(pool_base, timer_on_10s, NULL); + timer_30s = evtimer_new(pool_base, timer_on_30s, NULL); + timer_on_30s(-1, EV_TIMEOUT, NULL); + } - event_base_dispatch(base); + event_base_dispatch(pool_base); + +bail: + if (info) + freeaddrinfo(info); } static void cleanup(void) { log_info("Performing cleanup"); - if (listener_event) - event_free(listener_event); - if (config.webui_port) - stop_web_ui(); - if (signal_usr1) - event_free(signal_usr1); + if (timer_10s) + event_free(timer_10s); + if (timer_30s) + event_free(timer_30s); if (timer_120s) event_free(timer_120s); if (timer_10m) event_free(timer_10m); - if (base) - event_base_free(base); + if (listener_event) + event_free(listener_event); + if (trusted_event) + event_free(trusted_event); + if (upstream_event) + bufferevent_free(upstream_event); + if (config.webui_port) + stop_web_ui(); + if (signal_usr1) + event_free(signal_usr1); + if (trusted_base) + event_base_loopbreak(trusted_base); + if (pool_base) + event_base_free(pool_base); pool_clients_free(); if (bsh) bstack_free(bsh); @@ -3073,8 +3893,6 @@ print_help(struct option *opts) int main(int argc, char **argv) { - int evthread_use_pthreads(void); - static struct option options[] = { {"config-file", required_argument, 0, 'c'}, @@ -3149,17 +3967,17 @@ int main(int argc, char **argv) /* Any supplied command line options take precedent... */ if (log_file) { - strncpy(config.log_file, log_file, sizeof(config.log_file)); + strncpy(config.log_file, log_file, sizeof(config.log_file)-1); free(log_file); } if (data_dir) { - strncpy(config.data_dir, data_dir, sizeof(config.data_dir)); + strncpy(config.data_dir, data_dir, sizeof(config.data_dir)-1); free(data_dir); } if (pid_file) { - strncpy(config.pid_file, pid_file, sizeof(config.pid_file)); + strncpy(config.pid_file, pid_file, sizeof(config.pid_file)-1); free(pid_file); } if (forked > -1) @@ -3178,7 +3996,7 @@ int main(int argc, char **argv) } print_config(); - log_info("Starting pool"); + log_info("Starting pool on: %s:%d", config.pool_listen, config.pool_port); if (config.forked) { @@ -3189,6 +4007,8 @@ int main(int argc, char **argv) forkoff(pf); } + int evthread_use_pthreads(void); + signal(SIGINT, sigint_handler); signal(SIGPIPE, SIG_IGN); atexit(cleanup); @@ -3207,7 +4027,7 @@ int main(int argc, char **argv) bn_ctx = BN_CTX_new(); base_diff = NULL; BN_hex2bn(&base_diff, - "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"); + "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"); uuid_t iid; uuid_generate(iid); @@ -3219,7 +4039,7 @@ int main(int argc, char **argv) uic.port = config.webui_port; uic.pool_stats = &pool_stats; uic.pool_fee = config.pool_fee; - strncpy(uic.pool_listen, config.pool_listen, sizeof(uic.pool_listen)); + strncpy(uic.pool_listen, config.pool_listen, sizeof(uic.pool_listen)-1); uic.pool_port = config.pool_port; uic.pool_ssl_port = config.pool_ssl_port; uic.allow_self_select = !config.disable_self_select; diff --git a/src/webui.c b/src/webui.c index 1af4296..58ac5f1 100644 --- a/src/webui.c +++ b/src/webui.c @@ -159,7 +159,7 @@ thread_main(void *ctx) int start_web_ui(wui_context_t *context) { - log_debug("Starting Web UI"); + log_info("Starting Web UI on %s:%d", context->pool_listen, context->port); if (webui_base || handle) { log_error("Already running"); diff --git a/src/webui.h b/src/webui.h index dfc42ca..6b9cb1e 100644 --- a/src/webui.h +++ b/src/webui.h @@ -50,13 +50,13 @@ typedef struct pool_stats_t typedef struct wui_context_t { - uint32_t port; + uint16_t port; pool_stats_t *pool_stats; double pool_fee; double payment_threshold; - uint32_t pool_port; + uint16_t pool_port; char pool_listen[256]; - uint32_t pool_ssl_port; + uint16_t pool_ssl_port; unsigned allow_self_select; } wui_context_t; diff --git a/stratum-ss.md b/sss.md similarity index 100% rename from stratum-ss.md rename to sss.md