diff --git a/README.md b/README.md
index c889431..216ea9b 100644
--- a/README.md
+++ b/README.md
@@ -9,6 +9,10 @@ this thanks to the efficiency of both LMDB and libevent (for the stratum
clients) and some sensible proxying/caching being placed in front of the [web
UI](#web-ui).
+Configuration is extremely flexible, now allowing for the pool to run in a
+variety of setups, such as highly available and redundant configurations.
+Discussed further below in: [Interconnected pools](#Interconnected-pools).
+
This pool was the *first* pool to support RandomX and is currently the *only*
pool which supports the RandomX fast/full-memory mode.
@@ -18,7 +22,7 @@ focussed on performance, efficiency and stability.
The pool also supports an optional method of mining whereby miners select their
*own* block template to mine on. Further information can be found in the
-document: [Stratum mode self-select](./stratum-ss.md).
+document: [Stratum mode self-select](./sss.md).
For testing, a reference mainnet pool can be found at
[monerop.com](http://monerop.com).
@@ -83,15 +87,14 @@ should all be self explanatory.
There are also some [command-line parameters](#command-line-parameters) which
can be used to override some of these settings.
-#### Block notification
+### Block notification
-There is one configuration option that deserves a special mention.
-
-You can optionally start the pool with the flag `--block-notified` (or set in
+The pool can optionally be started with the flag `--block-notified` (or set in
the config file: `block-notified = 1`). This will prevent the pool from
-*polling* for new blocks using a timer, and instead, fetch a new block template
-when it receives a *signal* (specifically, *SIGUSR1*). Now whenever you start
-`monerod`, you'll make use of its `--block-notify` option.
+*polling* for new blocks (using a timer), and instead fetch a new block template
+when it receives a *signal* (specifically, *SIGUSR1*). The Monero daemon,
+`monerod`, has a feature whereby it can execute a command whenever a block as
+added to the chain, which can thus be used to generate the required signal.
E.g.
@@ -99,13 +102,79 @@ E.g.
monerod ... --block-notify '/usr/bin/pkill -USR1 monero-pool'
-This instructs `monerod` to send the required signal, *SIGUSR1*, to your pool
-whenever a new block is added to the chain.
+Launching `monerod` like this instructs it to send the required signal,
+*SIGUSR1*, to the pool whenever a new block is added to the chain.
-Using this mechanism has a *significant* benefit - your pool *immediatley* knows
+Using this mechanism has a *significant* benefit - your pool *immediately* knows
when to fetch a new block template to send to your miners. You're essentially
giving your miners a head-start over miners in pools which use polling (which is
-what all the other pool implementations do).
+what currently all the other pool implementations do).
+
+### Interconnected pools
+
+In some situations it's desirable to run multiple pool instances that behave as
+one. Some examples being:
+
+ - When running a global public pool, it's desirable to offer extremely
+ low-latency pool connections to geographically dispersed miners.
+ - When running a private pool across multiple data centers, it's desirable to
+ make use of the internal site-to-site network makeup.
+ - Where there are multiple hosts available, it's desirable to offer extra
+ redundancy.
+
+To meet these needs, multiple instances of the pool can be run with each
+behaving either as an edge pool, an upstream pool, both (i.e. bridged) or a
+normal single pool.
+
+Any pool that has an upstream pool configured does almost everything a normal
+pool does, with the exception that it offloads payout processing to its upstream
+pool, thus it relays validated shares and blocks to the upstream pool. In
+return, the upstream pool sends the combined pools stats, balance updates and
+handles the payout processing. Should an upstream become unreachable, the
+downstream pools continue as normal, then upon reconnection to the upstream,
+sends over the backlog of shares and blocks accumulated whilst the upstream was
+unreachable.
+
+Configuration is fairly trivial. A pool that will allow downstream pools to
+connect to it, does so via the config file parameters `trusted-listen`,
+`trusted-port` and `trusted-allowed`. E.g.
+
+ trusted-listen = 10.0.0.1
+ trusted-port = 4244
+ trusted-allowed = 10.0.0.2,10.0.0.3
+
+As share validation is performed on the edge pools, it's ***vitally*** important
+this trusted listener is secured. Ideally it's only bound to an internal / local
+network / private interface and specifying the IP addresses of the downstream
+pools allowed to connect to it (as in the example above). If the interface being
+bound to is already secured, the parameter `trusted-allowed` can be omitted.
+
+Then the downstream pools (`10.0.0.2` and `10.0.0.3` in the above example), need
+to include in their config files the parameters `upstream-host` and
+`upstream-port`. E.g.
+
+ upstream-host = 10.0.0.1
+ upstream-port = 4244
+
+To create a bridged pool, use all five parameters discussed above. For example:
+
+ trusted-listen = 10.0.0.4
+ trusted-port = 4244
+ trusted-allowed = 10.0.0.5,10.0.0.6
+ upstream-host = 10.0.0.1
+ upstream-port = 4244
+
+An example where bridging can be useful is for spanning network providers, e.g.
+using a global provider for the main pool hubs (the bridges) and local providers
+for edge pools within a territory.
+
+Every pool, however configured, still needs RPC access to a Monero daemon. They
+can of course all be configured to use the *same* daemon, or for extra
+redundancy, make use of separate daemons. Downstream pools do not need RPC
+access to the pool's wallet, only the final upstream needs wallet access. If
+Stratum mode self-select is being offered, the pool wallet view key can be set
+in the downstream pool config files via the `pool-view-key` parameter, or by
+running a local view-only wallet RPC.
## Running
@@ -132,15 +201,21 @@ command-line parameters:
## Web UI
-There is a minimal web UI that gets served on the port specified in the config
-file. If you plan on running a *public* pool, it's advisable to use either
-Apache or Nginx as a proxy in front of this with some appropriate caching
-configured. The goal is to offload browser based traffic to something built for
-the task and allow the pool to focus on its primary function - serving miners.
+This project is not designed to be a one-stop solution for running a public
+pool; it is an highly efficient mining pool implementation. For a public pool,
+which typically entails having a fancy web UI, that part is down to you. There
+is howeveer a minimal web UI that gets served on the port specified in the
+config file. If you plan on running a *public* pool via this UI (or any other
+for that matter), it's advisable to use either Apache or Nginx as a proxy in
+ front of this with some appropriate caching configured. The goal is to
+ offload browser based traffic to something built for the task and allow the
+ pool to focus on its primary function - serving miners.
-If you intend to make changes to the web UI, note that the HTML gets compiled
-into the pool binary. The single web page that gets served simply makes use of a
-JSON endpoint to populate the stats.
+If you intend to make changes to this minimal web UI, note that the HTML gets
+compiled into the pool binary. The single web page that gets served simply makes
+use of a JSON endpoint to populate the stats. Thus, a sensible option for your
+own web UI is to simply make use of that endpoint (for stats and balances), and
+keep your website completely separate, served by Apache or Nginx for example.
## SSL
diff --git a/pool.conf b/pool.conf
index d44d92b..1462d85 100644
--- a/pool.conf
+++ b/pool.conf
@@ -21,3 +21,9 @@ disable-self-select = 0
data-dir = ./data
pid-file =
forked = 0
+# trusted-listen = 127.0.0.1
+# trusted-port = 4244
+# trusted-allowed = 127.0.0.1,127.0.0.2
+# upstream-host = 127.0.0.1
+# upstream-port = 4244
+# pool-view-key =
diff --git a/src/pool.c b/src/pool.c
index c1ba313..5bddec6 100644
--- a/src/pool.c
+++ b/src/pool.c
@@ -75,7 +75,7 @@ developers.
#define RPC_BODY_MAX 8192
#define JOB_BODY_MAX 8192
#define ERROR_BODY_MAX 512
-#define STATUS_BODY_MAX 256
+#define STATUS_BODY_MAX 512
#define CLIENT_JOBS_MAX 4
#define BLOCK_HEADERS_MAX 4
#define BLOCK_TEMPLATES_MAX 4
@@ -91,6 +91,8 @@ developers.
#define HR_BLOCK_COUNT 5
#define TEMLATE_HEIGHT_VARIANCE 5
#define MAX_BAD_SHARES 5
+#define MAX_DOWNSTREAM 8
+#define MAX_HOST 256
#define uint128_t unsigned __int128
@@ -118,18 +120,25 @@ developers.
Payments
--------
wallet addr <-> payment_t
+
+ Properties
+ ----------
+ name <-> value
*/
-enum block_status { BLOCK_LOCKED=0, BLOCK_UNLOCKED=1, BLOCK_ORPHANED=2 };
-enum stratum_mode { MODE_NORMAL=0, MODE_SELF_SELECT=1 };
+enum block_status { BLOCK_LOCKED, BLOCK_UNLOCKED, BLOCK_ORPHANED };
+enum stratum_mode { MODE_NORMAL, MODE_SELF_SELECT };
+enum msgbin_type { BIN_PING, BIN_CONNECT, BIN_DISCONNECT, BIN_SHARE,
+ BIN_BLOCK, BIN_STATS, BIN_BALANCE };
+const unsigned char msgbin[] = {0x4D,0x4E,0x52,0x4F,0x50,0x4F,0x4F,0x4C};
typedef struct config_t
{
- char rpc_host[256];
- uint32_t rpc_port;
+ char rpc_host[MAX_HOST];
+ uint16_t rpc_port;
uint32_t rpc_timeout;
- char wallet_rpc_host[256];
- uint32_t wallet_rpc_port;
+ char wallet_rpc_host[MAX_HOST];
+ uint16_t wallet_rpc_port;
char pool_wallet[ADDRESS_MAX];
uint64_t pool_start_diff;
double share_mul;
@@ -137,17 +146,23 @@ typedef struct config_t
double retarget_ratio;
double pool_fee;
double payment_threshold;
- char pool_listen[256];
- uint32_t pool_port;
- uint32_t pool_ssl_port;
+ char pool_listen[MAX_HOST];
+ uint16_t pool_port;
+ uint16_t pool_ssl_port;
uint32_t log_level;
- uint32_t webui_port;
+ uint16_t webui_port;
char log_file[MAX_PATH];
bool block_notified;
bool disable_self_select;
char data_dir[MAX_PATH];
char pid_file[MAX_PATH];
bool forked;
+ char trusted_listen[MAX_HOST];
+ uint16_t trusted_port;
+ char trusted_allowed[MAX_DOWNSTREAM][MAX_HOST];
+ char upstream_host[MAX_HOST];
+ uint16_t upstream_port;
+ char pool_view_key[64];
} config_t;
typedef struct block_template_t
@@ -189,6 +204,8 @@ typedef struct client_t
bool is_xnp;
uint32_t mode;
uint8_t bad_shares;
+ bool downstream;
+ uint32_t downstream_miners;
} client_t;
typedef struct pool_clients_t
@@ -235,8 +252,9 @@ static config_t config;
static pool_clients_t pool_clients;
static bstack_t *bst;
static bstack_t *bsh;
-static struct event_base *base;
+static struct event_base *pool_base;
static struct event *listener_event;
+static struct event *timer_30s;
static struct event *timer_120s;
static struct event *timer_10m;
static struct event *signal_usr1;
@@ -248,6 +266,7 @@ static MDB_dbi db_shares;
static MDB_dbi db_blocks;
static MDB_dbi db_balance;
static MDB_dbi db_payments;
+static MDB_dbi db_properties;
static BN_CTX *bn_ctx;
static BIGNUM *base_diff;
static pool_stats_t pool_stats;
@@ -256,6 +275,13 @@ static FILE *fd_log;
static unsigned char sec_view[32];
static unsigned char pub_spend[32];
static uint8_t nettype;
+static pthread_t trusted_th;
+static struct event_base *trusted_base;
+static struct event *trusted_event;
+static struct bufferevent *upstream_event;
+static struct event *timer_10s;
+static time_t upstream_last_time;
+static uint64_t upstream_last_height;
#ifdef HAVE_RX
extern void rx_stop_mining();
@@ -326,11 +352,7 @@ compare_block(const MDB_val *a, const MDB_val *b)
{
const block_t *va = (const block_t*) a->mv_data;
const block_t *vb = (const block_t*) b->mv_data;
- int sc = memcmp(va->hash, vb->hash, 64);
- if (sc == 0)
- return (va->timestamp < vb->timestamp) ? -1 : 1;
- else
- return sc;
+ return (va->timestamp < vb->timestamp) ? -1 : 1;
}
static int
@@ -398,16 +420,28 @@ database_init(const char* data_dir)
log_fatal("%s", err);
exit(rc);
}
+ if ((rc = mdb_dbi_open(txn, "properties", flags, &db_properties)) != 0)
+ {
+ err = mdb_strerror(rc);
+ log_fatal("%s", err);
+ exit(rc);
+ }
+ MDB_val k, v;
+ k.mv_data = "upstream_last_height";
+ k.mv_size = strlen(k.mv_data);
+ if (!mdb_get(txn, db_properties, &k, &v))
+ memcpy(&upstream_last_height, v.mv_data, v.mv_size);
+ k.mv_data = "upstream_last_time";
+ k.mv_size = strlen(k.mv_data);
+ if (!mdb_get(txn, db_properties, &k, &v))
+ memcpy(&upstream_last_time, v.mv_data, v.mv_size);
mdb_set_compare(txn, db_shares, compare_uint64);
mdb_set_dupsort(txn, db_shares, compare_share);
-
mdb_set_compare(txn, db_blocks, compare_uint64);
mdb_set_dupsort(txn, db_blocks, compare_block);
-
mdb_set_compare(txn, db_payments, compare_string);
mdb_set_dupsort(txn, db_payments, compare_payment);
-
mdb_set_compare(txn, db_balance, compare_string);
rc = mdb_txn_commit(txn);
@@ -422,6 +456,7 @@ database_close(void)
mdb_dbi_close(env, db_blocks);
mdb_dbi_close(env, db_balance);
mdb_dbi_close(env, db_payments);
+ mdb_dbi_close(env, db_properties);
mdb_env_close(env);
}
@@ -631,7 +666,7 @@ payout_block(block_t *block, MDB_txn *parent)
/*
PPLNS
*/
- log_info("Payout on block at height %"PRIu64, block->height);
+ log_info("Payout on block at height: %"PRIu64, block->height);
int rc;
char *err;
MDB_txn *txn;
@@ -678,7 +713,7 @@ payout_block(block_t *block, MDB_txn *parent)
share_t *share = (share_t*)val.mv_data;
uint64_t amount = floor((double)share->difficulty /
- ((double)block->difficulty * config.share_mul) * block->reward);
+ ((double)block->difficulty * config.share_mul) * block->reward);
if (total_paid + amount > block->reward)
amount = block->reward - total_paid;
total_paid += amount;
@@ -742,12 +777,12 @@ process_blocks(block_t *blocks, size_t count)
op = MDB_NEXT_DUP;
if (rc == MDB_NOTFOUND || rc != 0)
{
- log_trace("No stored block at height %"PRIu64, ib->height);
+ log_trace("No stored block at height: %"PRIu64, ib->height);
if (rc != MDB_NOTFOUND)
{
err = mdb_strerror(rc);
- log_debug("No stored block at height %"PRIu64
- " with error: %d",
+ log_debug("No stored block at height: %"PRIu64
+ ", with error: %d",
ib->height, err);
}
break;
@@ -763,7 +798,7 @@ process_blocks(block_t *blocks, size_t count)
{
log_trace("Orphaning because hashes differ: %.64s, %.64s",
ib->hash, sb->hash);
- log_debug("Orphaned block at height %"PRIu64, ib->height);
+ log_debug("Orphaned block at height: %"PRIu64, ib->height);
nb.status |= BLOCK_ORPHANED;
MDB_val new_val = {sizeof(block_t), (void*)&nb};
mdb_cursor_put(cursor, &key, &new_val, MDB_CURRENT);
@@ -781,7 +816,7 @@ process_blocks(block_t *blocks, size_t count)
}
if (ib->status & BLOCK_ORPHANED)
{
- log_debug("Orphaned block at height %"PRIu64, ib->height);
+ log_debug("Orphaned block at height: %"PRIu64, ib->height);
nb.status |= BLOCK_ORPHANED;
MDB_val new_val = {sizeof(block_t), (void*)&nb};
mdb_cursor_put(cursor, &key, &new_val, MDB_CURRENT);
@@ -789,10 +824,11 @@ process_blocks(block_t *blocks, size_t count)
}
nb.status |= BLOCK_UNLOCKED;
nb.reward = ib->reward;
- rc = payout_block(&nb, txn);
- if (rc == 0)
+ if (!*config.upstream_host)
+ rc = payout_block(&nb, txn);
+ if (*config.upstream_host || rc == 0)
{
- log_debug("Paid out block %"PRIu64, nb.height);
+ log_debug("Paid out block: %"PRIu64, nb.height);
MDB_val new_val = {sizeof(block_t), (void*)&nb};
mdb_cursor_put(cursor, &key, &new_val, MDB_CURRENT);
}
@@ -865,7 +901,7 @@ retarget(client_t *client, job_t *job)
{
uint64_t target = client_target(client, job);
job->target = target;
- log_debug("Client %.32s target now %"PRIu64, client->client_id, target);
+ log_debug("Miner %.32s target now: %"PRIu64, client->client_id, target);
}
static void
@@ -882,7 +918,7 @@ target_to_hex(uint64_t target, char *target_hex)
#ifdef SIXTY_FOUR_BIT_LONG
BN_set_word(bnt, target);
#else
- char tmp[24];
+ char tmp[24] = {0};
snprintf(tmp, 24, "%"PRIu64, target);
BN_dec2bn(&bnt, tmp);
#endif
@@ -965,7 +1001,7 @@ stratum_get_job_body_ss(char *body, const client_t *client, bool response)
seed_hash = job->miner_template->seed_hash;
next_seed_hash = job->miner_template->next_seed_hash;
}
- unsigned char extra_bin[8];
+ unsigned char extra_bin[8] = {0};
memcpy(extra_bin, &job->extra_nonce, 4);
memcpy(extra_bin+4, &instance_id, 4);
char extra_hex[17] = {0};
@@ -1057,7 +1093,7 @@ static void
send_validation_error(const client_t *client, const char *message)
{
struct evbuffer *output = bufferevent_get_output(client->bev);
- char body[ERROR_BODY_MAX];
+ char body[ERROR_BODY_MAX] = {0};
stratum_get_error_body(body, client->json_id, message);
evbuffer_add(output, body, strlen(body));
log_debug("Validation error: %s", message);
@@ -1109,7 +1145,7 @@ client_find_job(client_t *client, const char *job_id)
}
static void
-client_send_job(client_t *client, bool response)
+miner_send_job(client_t *client, bool response)
{
/* First cycle jobs */
job_t *last = &client->active_jobs[CLIENT_JOBS_MAX-1];
@@ -1153,9 +1189,9 @@ client_send_job(client_t *client, bool response)
retarget(client, job);
++extra_nonce;
job->extra_nonce = extra_nonce;
- char body[JOB_BODY_MAX];
+ char body[JOB_BODY_MAX] = {0};
stratum_get_job_body_ss(body, client, response);
- log_trace("Client job: %s", body);
+ log_trace("Miner job: %s", body);
struct evbuffer *output = bufferevent_get_output(client->bev);
evbuffer_add(output, body, strlen(body));
return;
@@ -1213,7 +1249,7 @@ client_send_job(client_t *client, bool response)
/* Retarget */
retarget(client, job);
- char body[JOB_BODY_MAX];
+ char body[JOB_BODY_MAX] = {0};
if (!client->is_xnp)
{
stratum_get_job_body(body, client, response);
@@ -1225,7 +1261,7 @@ client_send_job(client_t *client, bool response)
stratum_get_proxy_job_body(body, client, block_hex, response);
free(block_hex);
}
- log_trace("Client job: %s", body);
+ log_trace("Miner job: %.*s", strlen(body)-1, body);
struct evbuffer *output = bufferevent_get_output(client->bev);
evbuffer_add(output, body, strlen(body));
free(block);
@@ -1238,9 +1274,9 @@ pool_clients_send_job(void)
client_t *c = pool_clients.clients;
for (size_t i = 0; i < pool_clients.count; i++, c++)
{
- if (c->fd == 0 || c->address[0] == 0)
+ if (c->fd == 0 || c->address[0] == 0 || c->downstream)
continue;
- client_send_job(c, false);
+ miner_send_job(c, false);
}
}
@@ -1356,7 +1392,8 @@ rpc_on_response(struct evhttp_request *req, void *arg)
}
static void
-rpc_request(struct event_base *base, const char *body, rpc_callback_t *callback)
+rpc_request(struct event_base *base, const char *body,
+ rpc_callback_t *callback)
{
struct evhttp_connection *con;
struct evhttp_request *req;
@@ -1428,7 +1465,7 @@ rpc_get_request_body(char *body, const char *method, char *fmt, ...)
break;
case 'd':
d = va_arg(args, uint64_t);
- char tmp[24];
+ char tmp[24] = {0};
snprintf(tmp, 24, "%"PRIu64, d);
pb = stecpy(pb, tmp, end);
break;
@@ -1557,6 +1594,9 @@ startup_scan_round_shares()
MDB_txn *txn;
MDB_cursor *cursor;
+ if (*config.upstream_host)
+ return 0;
+
if ((rc = mdb_txn_begin(env, NULL, MDB_RDONLY, &txn)) != 0)
{
err = mdb_strerror(rc);
@@ -1639,21 +1679,22 @@ startup_payout(uint64_t height)
break;
pool_stats.pool_blocks_found++;
-
block_t *block = (block_t*)val.mv_data;
- pool_stats.last_block_found = block->timestamp;
+
+ if (!upstream_event)
+ pool_stats.last_block_found = block->timestamp;
if (block->height > height - 60)
continue;
if (block->status != BLOCK_LOCKED)
continue;
- char body[RPC_BODY_MAX];
+ char body[RPC_BODY_MAX] = {0};
rpc_get_request_body(body, "get_block_header_by_height", "sd",
"height", block->height);
rpc_callback_t *cb = rpc_callback_new(
rpc_on_block_header_by_height, NULL);
- rpc_request(base, body, cb);
+ rpc_request(pool_base, body, cb);
}
mdb_cursor_close(cursor);
@@ -1740,12 +1781,12 @@ rpc_on_last_block_header(const char* data, rpc_callback_t *callback)
if (need_new_template)
{
log_info("Fetching new block template");
- char body[RPC_BODY_MAX];
+ char body[RPC_BODY_MAX] = {0};
uint64_t reserve = 17;
rpc_get_request_body(body, "get_block_template", "sssd",
"wallet_address", config.pool_wallet, "reserve_size", reserve);
rpc_callback_t *cb1 = rpc_callback_new(rpc_on_block_template, NULL);
- rpc_request(base, body, cb1);
+ rpc_request(pool_base, body, cb1);
uint64_t end = front->height - 60;
uint64_t start = end - BLOCK_HEADERS_RANGE + 1;
@@ -1753,7 +1794,7 @@ rpc_on_last_block_header(const char* data, rpc_callback_t *callback)
"start_height", start, "end_height", end);
rpc_callback_t *cb2 = rpc_callback_new(
rpc_on_block_headers_range, NULL);
- rpc_request(base, body, cb2);
+ rpc_request(pool_base, body, cb2);
}
json_object_put(root);
@@ -1787,8 +1828,11 @@ rpc_on_block_submitted(const char* data, rpc_callback_t *callback)
}
pool_stats.pool_blocks_found++;
block_t *b = (block_t*)callback->data;
- pool_stats.last_block_found = b->timestamp;
- pool_stats.round_hashes = 0;
+ if (!upstream_event)
+ {
+ pool_stats.last_block_found = b->timestamp;
+ pool_stats.round_hashes = 0;
+ }
log_info("Block submitted at height: %"PRIu64, b->height);
int rc = store_block(b->height, b);
if (rc != 0)
@@ -1918,6 +1962,8 @@ cleanup:
static int
send_payments(void)
{
+ if (*config.upstream_host)
+ return 0;
uint64_t threshold = 1000000000000 * config.payment_threshold;
int rc;
char *err;
@@ -1960,9 +2006,9 @@ send_payments(void)
if (amount < threshold)
continue;
- log_info("Sending payment of %"PRIu64" to %s\n", amount, address);
+ log_info("Sending payment: %"PRIu64", %.8s", amount, address);
- strncpy(payment->address, address, ADDRESS_MAX);
+ strncpy(payment->address, address, ADDRESS_MAX-1);
payment->amount = amount;
payments_count++;
@@ -2004,7 +2050,7 @@ send_payments(void)
log_trace(body);
rpc_callback_t *cb = rpc_callback_new(
rpc_on_wallet_transferred, payments);
- rpc_wallet_request(base, body, cb);
+ rpc_wallet_request(pool_base, body, cb);
}
else
free(payments);
@@ -2015,20 +2061,502 @@ send_payments(void)
static void
fetch_view_key(void)
{
- char body[RPC_BODY_MAX];
+ if (*config.pool_view_key)
+ {
+ hex_to_bin(config.pool_view_key, 64, sec_view, 32);
+ log_info("Using pool view-key: %.4s", config.pool_view_key);
+ return;
+ }
+ if (*sec_view)
+ return;
+ char body[RPC_BODY_MAX] = {0};
rpc_get_request_body(body, "query_key", "ss", "key_type", "view_key");
rpc_callback_t *cb = rpc_callback_new(rpc_on_view_key, NULL);
- rpc_wallet_request(base, body, cb);
+ rpc_wallet_request(pool_base, body, cb);
}
static void
fetch_last_block_header(void)
{
log_info("Fetching last block header");
- char body[RPC_BODY_MAX];
+ char body[RPC_BODY_MAX] = {0};
rpc_get_request_body(body, "get_last_block_header", NULL);
rpc_callback_t *cb = rpc_callback_new(rpc_on_last_block_header, NULL);
- rpc_request(base, body, cb);
+ rpc_request(pool_base, body, cb);
+}
+
+static int
+store_last_height_time()
+{
+ int rc;
+ char *err;
+ MDB_txn *txn;
+ MDB_val k, v;
+ if ((rc = mdb_txn_begin(env, NULL, 0, &txn)))
+ {
+ err = mdb_strerror(rc);
+ log_error("%s", err);
+ return rc;
+ }
+ k.mv_data = "upstream_last_height";
+ k.mv_size = strlen(k.mv_data);
+ v.mv_data = &upstream_last_height;
+ v.mv_size = sizeof(upstream_last_height);
+ if ((rc = mdb_put(txn, db_properties, &k, &v, 0)))
+ {
+ err = mdb_strerror(rc);
+ log_error("%s", err);
+ mdb_txn_abort(txn);
+ return rc;
+ }
+ k.mv_data = "upstream_last_time";
+ k.mv_size = strlen(k.mv_data);
+ v.mv_data = &upstream_last_time;
+ v.mv_size = sizeof(upstream_last_time);
+ if ((rc = mdb_put(txn, db_properties, &k, &v, 0)))
+ {
+ err = mdb_strerror(rc);
+ log_error("%s", err);
+ mdb_txn_abort(txn);
+ return rc;
+ }
+ if ((rc = mdb_txn_commit(txn)))
+ {
+ err = mdb_strerror(rc);
+ log_error("%s", err);
+ return rc;
+ }
+ return rc;
+}
+
+static void
+trusted_send_stats(client_t *client)
+{
+ struct evbuffer *output = bufferevent_get_output(client->bev);
+ size_t z = 9 + sizeof(pool_stats);
+ char data[z];
+ int t = BIN_STATS;
+ memcpy(data, msgbin, 8);
+ memcpy(data+8, &t, 1);
+ memcpy(data+9, &pool_stats, z-9);
+ evbuffer_add(output, data, z);
+}
+
+static void
+trusted_send_balance(client_t *client, const char *address)
+{
+ struct evbuffer *output = bufferevent_get_output(client->bev);
+ size_t z = 9 + sizeof(uint64_t) + ADDRESS_MAX;
+ char data[z];
+ int t = BIN_BALANCE;
+ memcpy(data, msgbin, 8);
+ memcpy(data+8, &t, 1);
+ uint64_t balance = miner_balance(address);
+ memcpy(data+9, &balance, sizeof(uint64_t));
+ memcpy(data+9+sizeof(uint64_t), address, ADDRESS_MAX);
+ evbuffer_add(output, data, z);
+}
+
+static void
+upstream_send_ping()
+{
+ struct evbuffer *output = bufferevent_get_output(upstream_event);
+ char data[9];
+ int t = BIN_PING;
+ memcpy(data, msgbin, 8);
+ memcpy(data+8, &t, 1);
+ evbuffer_add(output, data, 9);
+ log_trace("Sending message ping upstream");
+}
+
+static void
+upstream_send_client_connect()
+{
+ struct evbuffer *output = bufferevent_get_output(upstream_event);
+ char data[9];
+ int t = BIN_CONNECT;
+ memcpy(data, msgbin, 8);
+ memcpy(data+8, &t, 1);
+ evbuffer_add(output, data, 9);
+ log_trace("Sending message connect upstream");
+}
+
+static void
+upstream_send_client_disconnect()
+{
+ struct evbuffer *output = bufferevent_get_output(upstream_event);
+ char data[9];
+ int t = BIN_DISCONNECT;
+ memcpy(data, msgbin, 8);
+ memcpy(data+8, &t, 1);
+ evbuffer_add(output, data, 9);
+ log_trace("Sending message disconnect upstream");
+}
+
+static void
+upstream_send_client_share(share_t *share)
+{
+ struct evbuffer *output = bufferevent_get_output(upstream_event);
+ size_t z = 9 + sizeof(share_t);
+ char data[z];
+ int t = BIN_SHARE;
+ memcpy(data, msgbin, 8);
+ memcpy(data+8, &t, 1);
+ memcpy(data+9, share, z-9);
+ evbuffer_add(output, data, z);
+ bool update_req = false;
+ if (share->height > upstream_last_height)
+ {
+ upstream_last_height = share->height;
+ update_req = true;
+ }
+ if (share->timestamp > upstream_last_time)
+ {
+ upstream_last_time = share->timestamp;
+ update_req = true;
+ }
+ if (update_req)
+ store_last_height_time();
+ log_trace("Sending share upstream: %"PRIu64", %"PRIu64", %"PRIu64,
+ share->difficulty, share->height, share->timestamp);
+}
+
+static void
+upstream_send_client_block(block_t *block)
+{
+ struct evbuffer *output = bufferevent_get_output(upstream_event);
+ size_t z = 9 + sizeof(block_t);
+ char data[z];
+ int t = BIN_BLOCK;
+ memcpy(data, msgbin, 8);
+ memcpy(data+8, &t, 1);
+ memcpy(data+9, block, z-9);
+ evbuffer_add(output, data, z);
+ bool update_req = false;
+ if (block->height > upstream_last_height)
+ {
+ upstream_last_height = block->height;
+ update_req = true;
+ }
+ if (block->timestamp > upstream_last_time)
+ {
+ upstream_last_time = block->timestamp;
+ update_req = true;
+ }
+ if (update_req)
+ store_last_height_time();
+ log_info("Sending block upstream: %.8s, %d, %d",
+ block->hash, block->height, block->timestamp);
+}
+
+static void
+upstream_send_backlog()
+{
+ /*
+ Send any unsent shares and blocks upstream.
+ */
+ int rc;
+ char *err;
+ MDB_txn *txn;
+ MDB_cursor *curshr, *curblk;
+ if (!upstream_last_height || !upstream_last_time || !upstream_event)
+ return;
+ log_info("Sending upstream shares/blocks since: %"PRIu64", %"PRIu64,
+ upstream_last_height, upstream_last_time);
+ if ((rc = mdb_txn_begin(env, NULL, MDB_RDONLY, &txn)))
+ {
+ err = mdb_strerror(rc);
+ log_error("%s", err);
+ return;
+ }
+ if ((rc = mdb_cursor_open(txn, db_shares, &curshr)))
+ {
+ err = mdb_strerror(rc);
+ log_error("%s", err);
+ mdb_txn_abort(txn);
+ return;
+ }
+ if ((rc = mdb_cursor_open(txn, db_blocks, &curblk)))
+ {
+ err = mdb_strerror(rc);
+ log_error("%s", err);
+ mdb_txn_abort(txn);
+ return;
+ }
+ uint64_t h = upstream_last_height;
+ time_t t = upstream_last_time;
+ MDB_cursor_op op = MDB_SET;
+ while (1)
+ {
+ MDB_val k, v;
+ if (op == MDB_SET)
+ {
+ k.mv_size = sizeof(h);
+ k.mv_data = &h;
+ }
+ if (mdb_cursor_get(curshr, &k, &v, op))
+ break;
+ op = MDB_NEXT;
+ share_t *s = (share_t*) v.mv_data;
+ if (s->timestamp <= t)
+ continue;
+ upstream_send_client_share(s);
+ }
+ op = MDB_SET;
+ while (1)
+ {
+ MDB_val k, v;
+ if (op == MDB_SET)
+ {
+ k.mv_size = sizeof(h);
+ k.mv_data = &h;
+ }
+ if (mdb_cursor_get(curblk, &k, &v, op))
+ break;
+ op = MDB_NEXT;
+ block_t *b = (block_t*) v.mv_data;
+ if (b->timestamp <= t)
+ continue;
+ upstream_send_client_block(b);
+ }
+ mdb_cursor_close(curshr);
+ mdb_cursor_close(curblk);
+ mdb_txn_abort(txn);
+}
+
+static void
+trusted_on_client_connect(client_t *client)
+{
+ pool_stats.connected_miners++;
+ client->downstream_miners++;
+ log_trace("Downstream miner connected. "
+ "Miner count: %d, Pool hashrate: %"PRIu64,
+ pool_stats.connected_miners, pool_stats.pool_hashrate);
+ trusted_send_stats(client);
+ if (upstream_event)
+ upstream_send_client_connect();
+}
+
+static void
+trusted_on_client_disconnect(client_t *client)
+{
+ pool_stats.connected_miners--;
+ if (client->downstream_miners)
+ client->downstream_miners--;
+ log_trace("Downstream miner disconnected. "
+ "Miner count: %d, Pool hashrate: %"PRIu64,
+ pool_stats.connected_miners, pool_stats.pool_hashrate);
+ trusted_send_stats(client);
+ if (upstream_event)
+ upstream_send_client_disconnect();
+}
+
+static void
+trusted_on_client_share(client_t *client)
+{
+ /*
+ Downstream validated, so just store for payouts.
+ */
+ struct evbuffer *input = bufferevent_get_input(client->bev);
+ share_t s;
+ int rc;
+ evbuffer_remove(input, (void*)&s, sizeof(share_t));
+ log_debug("Received share from downstream with difficulty: %"PRIu64,
+ s.difficulty);
+ client->hashes += s.difficulty;
+ pool_stats.round_hashes += s.difficulty;
+ rc = store_share(s.height, &s);
+ if (rc != 0)
+ log_warn("Failed to store share: %s", mdb_strerror(rc));
+ trusted_send_stats(client);
+ trusted_send_balance(client, s.address);
+ if (upstream_event)
+ upstream_send_client_share(&s);
+}
+
+static void
+trusted_on_client_block(client_t *client)
+{
+ struct evbuffer *input = bufferevent_get_input(client->bev);
+ block_t b;
+ int rc;
+ evbuffer_remove(input, (void*)&b, sizeof(block_t));
+ pool_stats.pool_blocks_found++;
+ pool_stats.last_block_found = b.timestamp;
+ pool_stats.round_hashes = 0;
+ log_info("Block submitted by downstream: %.8s, %"PRIu64, b.hash, b.height);
+ rc = store_block(b.height, &b);
+ if (rc != 0)
+ log_warn("Failed to store block: %s", mdb_strerror(rc));
+ trusted_send_stats(client);
+ if (upstream_event)
+ upstream_send_client_block(&b);
+}
+
+static void
+upstream_on_stats(struct bufferevent *bev)
+{
+ struct evbuffer *input = bufferevent_get_input(bev);
+ evbuffer_remove(input, &pool_stats, sizeof(pool_stats_t));
+ log_trace("Stats from upstream: "
+ "%d, %"PRIu64", %"PRIu64", %d, %"PRIu64,
+ pool_stats.connected_miners,
+ pool_stats.pool_hashrate,
+ pool_stats.round_hashes,
+ pool_stats.pool_blocks_found,
+ pool_stats.last_block_found);
+}
+
+static int
+upstream_on_balance(struct bufferevent *bev)
+{
+ int rc;
+ char *err;
+ MDB_txn *txn;
+ uint64_t balance;
+ char address[ADDRESS_MAX];
+ struct evbuffer *input = bufferevent_get_input(bev);
+ evbuffer_remove(input, &balance, sizeof(uint64_t));
+ evbuffer_remove(input, address, ADDRESS_MAX);
+ log_trace("Balance from upstream: %.8s, %"PRIu64, address, balance);
+ if ((rc = mdb_txn_begin(env, NULL, 0, &txn)) != 0)
+ {
+ err = mdb_strerror(rc);
+ log_error("%s", err);
+ return rc;
+ }
+ MDB_val k = {ADDRESS_MAX, (void*)address};
+ MDB_val v = {sizeof(uint64_t), (void*)&balance};
+ if ((rc = mdb_put(txn, db_balance, &k, &v, 0)))
+ {
+ err = mdb_strerror(rc);
+ log_error("%s", err);
+ mdb_txn_abort(txn);
+ return rc;
+ }
+ rc = mdb_txn_commit(txn);
+ return rc;
+}
+
+static void
+upstream_on_read(struct bufferevent *bev, void *ctx)
+{
+ struct evbuffer *input = bufferevent_get_input(bev);
+ struct evbuffer_ptr tag;
+ unsigned char tnt[9];
+ size_t len;
+
+ input = bufferevent_get_input(bev);
+ while ((len = evbuffer_get_length(input)) >= 9)
+ {
+ tag = evbuffer_search(input, (const char*) msgbin, 8, NULL);
+ if (tag.pos < 0)
+ {
+ log_error("Bad message from upstream");
+ evbuffer_drain(input, len);
+ return;
+ }
+
+ evbuffer_remove(input, tnt, 9);
+
+
+ switch (tnt[8])
+ {
+ case BIN_STATS:
+ upstream_on_stats(bev);
+ break;
+ case BIN_BALANCE:
+ upstream_on_balance(bev);
+ break;
+ default:
+ log_error("Unsupported message type: %d", tnt[8]);
+ evbuffer_drain(input, evbuffer_get_length(input));
+ return;
+ }
+ }
+}
+
+static void
+upstream_on_event(struct bufferevent *bev, short error, void *ctx)
+{
+ if (error & BEV_EVENT_CONNECTED)
+ {
+ log_info("Connected to upstream: %s:%d",
+ config.upstream_host, config.upstream_port);
+ upstream_send_backlog();
+ return;
+ }
+ if (error & BEV_EVENT_EOF)
+ {
+ log_debug("Upstream disconnected");
+ }
+ else if (error & BEV_EVENT_ERROR)
+ {
+ log_debug("Upstream connection error: %d", errno);
+ }
+ else if (error & BEV_EVENT_TIMEOUT)
+ {
+ log_debug("Upstream timeout");
+ }
+ /* Wait and try to reconnect */
+ if (upstream_event)
+ {
+ bufferevent_free(upstream_event);
+ upstream_event = NULL;
+ }
+ log_warn("No connection to upstream; retrying in 10s");
+ struct timeval timeout = { .tv_sec = 10, .tv_usec = 0 };
+ evtimer_add(timer_10s, &timeout);
+}
+
+static void
+upstream_connect()
+{
+ struct addrinfo *info;
+ int rc;
+ char port[6] = {0};
+
+ sprintf(port, "%d", config.upstream_port);
+ if ((rc = getaddrinfo(config.upstream_host, port, 0, &info)))
+ {
+ log_fatal("Error parsing upstream host: %s", gai_strerror(rc));
+ return;
+ }
+
+ upstream_event = bufferevent_socket_new(pool_base, -1,
+ BEV_OPT_CLOSE_ON_FREE);
+
+ if (bufferevent_socket_connect(upstream_event,
+ info->ai_addr, info->ai_addrlen) < 0)
+ {
+ perror("connect");
+ goto bail;
+ }
+
+ bufferevent_setcb(upstream_event,
+ upstream_on_read, NULL, upstream_on_event, NULL);
+ bufferevent_enable(upstream_event, EV_READ|EV_WRITE);
+ evutil_make_socket_nonblocking(bufferevent_getfd(upstream_event));
+
+bail:
+ freeaddrinfo(info);
+}
+
+static void
+timer_on_10s(int fd, short kind, void *ctx)
+{
+ log_info("Reconnecting to upstream: %s:%d",
+ config.upstream_host, config.upstream_port);
+ upstream_connect();
+}
+
+static void
+timer_on_30s(int fd, short kind, void *ctx)
+{
+ if (upstream_event)
+ upstream_send_ping();
+ struct timeval timeout = { .tv_sec = 30, .tv_usec = 0 };
+ evtimer_add(timer_30s, &timeout);
}
static void
@@ -2049,9 +2577,8 @@ timer_on_10m(int fd, short kind, void *ctx)
}
static void
-client_add(int fd, struct bufferevent *bev)
+client_add(int fd, struct bufferevent *bev, bool downstream)
{
- log_info("New client connected");
client_t *c = pool_clients.clients;
bool resize = true;
for (size_t i = 0; i < pool_clients.count; i++, c++)
@@ -2079,7 +2606,11 @@ client_add(int fd, struct bufferevent *bev)
c->fd = fd;
c->bev = bev;
c->connected_since = time(NULL);
- pool_stats.connected_miners++;
+ c->downstream = downstream;
+ if (!downstream)
+ pool_stats.connected_miners++;
+ if (upstream_event)
+ upstream_send_client_connect();
}
static void
@@ -2111,13 +2642,18 @@ client_clear(struct bufferevent *bev)
if (!client)
return;
client_clear_jobs(client);
+ if (client->downstream)
+ pool_stats.connected_miners -= client->downstream_miners;
+ else
+ pool_stats.connected_miners--;
memset(client, 0, sizeof(client_t));
bufferevent_free(bev);
- pool_stats.connected_miners--;
+ if (upstream_event)
+ upstream_send_client_disconnect();
}
static void
-client_on_login(json_object *message, client_t *client)
+miner_on_login(json_object *message, client_t *client)
{
JSON_GET_OR_ERROR(params, message, json_type_object, client);
JSON_GET_OR_ERROR(login, params, json_type_string, client);
@@ -2140,7 +2676,7 @@ client_on_login(json_object *message, client_t *client)
return;
}
client->mode = MODE_SELF_SELECT;
- log_trace("Client login for mode: self-select");
+ log_trace("Miner login for mode: self-select");
}
}
}
@@ -2181,17 +2717,17 @@ client_on_login(json_object *message, client_t *client)
return;
}
- strncpy(client->address, address, sizeof(client->address));
- strncpy(client->worker_id, worker_id, sizeof(client->worker_id));
+ strncpy(client->address, address, sizeof(client->address)-1);
+ strncpy(client->worker_id, worker_id, sizeof(client->worker_id)-1);
uuid_t cid;
uuid_generate(cid);
bin_to_hex((const unsigned char*)cid, sizeof(uuid_t),
client->client_id, 32);
- client_send_job(client, true);
+ miner_send_job(client, true);
}
static void
-client_on_block_template(json_object *message, client_t *client)
+miner_on_block_template(json_object *message, client_t *client)
{
struct evbuffer *output = bufferevent_get_output(client->bev);
@@ -2214,8 +2750,9 @@ client_on_block_template(json_object *message, client_t *client)
int64_t dh = llabs(h - (int64_t)pool_stats.network_height);
if (dh > TEMLATE_HEIGHT_VARIANCE)
{
- char m[64];
- snprintf(m, 64, "Bad height. Differs to pool by %"PRIu64" blocks.", dh);
+ char m[64] = {0};
+ snprintf(m, 64, "Bad height. "
+ "Differs to pool by %"PRIu64" blocks.", dh);
send_validation_error(client, m);
return;
}
@@ -2266,14 +2803,14 @@ client_on_block_template(json_object *message, client_t *client)
json_object_get_string(next_seed_hash), 64);
}
- log_trace("Client set template: %s", btb);
- char body[STATUS_BODY_MAX];
+ log_trace("Miner set template: %s", btb);
+ char body[STATUS_BODY_MAX] = {0};
stratum_get_status_body(body, client->json_id, "OK");
evbuffer_add(output, body, strlen(body));
}
static void
-client_on_submit(json_object *message, client_t *client)
+miner_on_submit(json_object *message, client_t *client)
{
struct evbuffer *output = bufferevent_get_output(client->bev);
@@ -2319,7 +2856,7 @@ client_on_submit(json_object *message, client_t *client)
return;
}
- log_trace("Client submitted nonce=%u, result=%s",
+ log_trace("Miner submitted nonce=%u, result=%s",
result_nonce, result_hex);
/*
1. Validate submission
@@ -2335,8 +2872,8 @@ client_on_submit(json_object *message, client_t *client)
check result hash against template difficulty
(submit to network if good) add share to db
- Note reserved space is: extra_nonce, instance_id, pool_nonce, worker_nonce
- 4 bytes each.
+ Note reserved space is: extra_nonce|instance_id|pool_nonce|worker_nonce
+ 4 bytes each.
*/
/* Convert template to blob */
@@ -2400,7 +2937,7 @@ client_on_submit(json_object *message, client_t *client)
{
if (submissions[i] == sub)
{
- char body[ERROR_BODY_MAX];
+ char body[ERROR_BODY_MAX] = {0};
stratum_get_error_body(body, client->json_id, "Duplicate share");
evbuffer_add(output, body, strlen(body));
log_debug("Duplicate share");
@@ -2423,7 +2960,7 @@ client_on_submit(json_object *message, client_t *client)
if (get_hashing_blob(block, bin_size,
&hashing_blob, &hashing_blob_size) != 0)
{
- char body[ERROR_BODY_MAX];
+ char body[ERROR_BODY_MAX] = {0};
stratum_get_error_body(body, client->json_id, "Invalid block");
evbuffer_add(output, body, strlen(body));
log_debug("Invalid block");
@@ -2438,7 +2975,7 @@ client_on_submit(json_object *message, client_t *client)
uint8_t pow_variant = major_version >= 7 ? major_version - 6 : 0;
if (pow_variant >= 6)
{
- unsigned char seed_hash[32];
+ unsigned char seed_hash[32] = {0};
hex_to_bin(bt->seed_hash, 64, seed_hash, 32);
get_rx_hash(hashing_blob, hashing_blob_size,
(unsigned char*)result_hash, seed_hash, bt->height);
@@ -2452,7 +2989,7 @@ client_on_submit(json_object *message, client_t *client)
if (memcmp(submitted_hash, result_hash, 32) != 0)
{
- char body[ERROR_BODY_MAX];
+ char body[ERROR_BODY_MAX] = {0};
stratum_get_error_body(body, client->json_id, "Invalid share");
evbuffer_add(output, body, strlen(body));
log_debug("Invalid share");
@@ -2487,7 +3024,7 @@ client_on_submit(json_object *message, client_t *client)
log_info("+++ MINED A BLOCK +++");
char *block_hex = calloc((bin_size << 1)+1, sizeof(char));
bin_to_hex(block, bin_size, block_hex, bin_size << 1);
- char body[RPC_BODY_MAX];
+ char body[RPC_BODY_MAX] = {0};
snprintf(body, RPC_BODY_MAX,
"{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":"
"\"submit_block\", \"params\":[\"%s\"]}",
@@ -2505,14 +3042,15 @@ client_on_submit(json_object *message, client_t *client)
b->difficulty = bt->difficulty;
b->status = BLOCK_LOCKED;
b->timestamp = now;
-
- rpc_request(base, body, cb);
+ if (upstream_event)
+ upstream_send_client_block(b);
+ rpc_request(pool_base, body, cb);
free(block_hex);
}
else if (BN_cmp(hd, jd) < 0)
{
can_store = false;
- char body[ERROR_BODY_MAX];
+ char body[ERROR_BODY_MAX] = {0};
stratum_get_error_body(body, client->json_id, "Low difficulty share");
evbuffer_add(output, body, strlen(body));
log_debug("Low difficulty (%lu) share", BN_get_word(jd));
@@ -2532,27 +3070,30 @@ client_on_submit(json_object *message, client_t *client)
share_t share;
share.height = bt->height;
share.difficulty = job->target;
- strncpy(share.address, client->address, sizeof(share.address));
+ strncpy(share.address, client->address, sizeof(share.address)-1);
share.timestamp = now;
- pool_stats.round_hashes += share.difficulty;
+ if (!upstream_event)
+ pool_stats.round_hashes += share.difficulty;
log_debug("Storing share with difficulty: %"PRIu64, share.difficulty);
int rc = store_share(share.height, &share);
if (rc != 0)
log_warn("Failed to store share: %s", mdb_strerror(rc));
- char body[STATUS_BODY_MAX];
+ char body[STATUS_BODY_MAX] = {0};
stratum_get_status_body(body, client->json_id, "OK");
evbuffer_add(output, body, strlen(body));
+ if (upstream_event)
+ upstream_send_client_share(&share);
}
if (retarget_required(client, job))
{
log_debug("Sending an early job as this was less than %u%% of"
" potential", (unsigned)(100.*config.retarget_ratio));
- client_send_job(client, false);
+ miner_send_job(client, false);
}
}
static void
-client_on_read(struct bufferevent *bev, void *ctx)
+miner_on_read(struct bufferevent *bev, void *ctx)
{
const char *unknown_method = "Removing client. Unknown method called.";
const char *too_bad = "Removing client. Too many bad shares.";
@@ -2573,7 +3114,7 @@ client_on_read(struct bufferevent *bev, void *ctx)
size_t len = evbuffer_get_length(input);
if (len > MAX_LINE)
{
- char body[ERROR_BODY_MAX];
+ char body[ERROR_BODY_MAX] = {0};
stratum_get_error_body(body, client->json_id, too_long);
evbuffer_add(output, body, strlen(body));
log_info(too_long);
@@ -2588,7 +3129,7 @@ client_on_read(struct bufferevent *bev, void *ctx)
if (!message)
{
free(line);
- char body[ERROR_BODY_MAX];
+ char body[ERROR_BODY_MAX] = {0};
stratum_get_error_body(body, client->json_id, invalid_json);
evbuffer_add(output, body, strlen(body));
log_info(invalid_json);
@@ -2609,23 +3150,23 @@ client_on_read(struct bufferevent *bev, void *ctx)
}
else if (strcmp(method_name, "login") == 0)
{
- client_on_login(message, client);
+ miner_on_login(message, client);
}
else if (strcmp(method_name, "block_template") == 0)
{
- client_on_block_template(message, client);
+ miner_on_block_template(message, client);
}
else if (strcmp(method_name, "submit") == 0)
{
- client_on_submit(message, client);
+ miner_on_submit(message, client);
}
else if (strcmp(method_name, "getjob") == 0)
{
- client_send_job(client, false);
+ miner_send_job(client, false);
}
else if (strcmp(method_name, "keepalived") == 0)
{
- char body[STATUS_BODY_MAX];
+ char body[STATUS_BODY_MAX] = {0};
stratum_get_status_body(body, client->json_id, "KEEPALIVED");
evbuffer_add(output, body, strlen(body));
}
@@ -2639,7 +3180,7 @@ client_on_read(struct bufferevent *bev, void *ctx)
if (unknown)
{
- char body[ERROR_BODY_MAX];
+ char body[ERROR_BODY_MAX] = {0};
stratum_get_error_body(body, client->json_id, unknown_method);
evbuffer_add(output, body, strlen(body));
log_info(unknown_method);
@@ -2649,7 +3190,7 @@ client_on_read(struct bufferevent *bev, void *ctx)
}
if (client->bad_shares > MAX_BAD_SHARES)
{
- char body[ERROR_BODY_MAX];
+ char body[ERROR_BODY_MAX] = {0};
stratum_get_error_body(body, client->json_id, too_bad);
evbuffer_add(output, body, strlen(body));
log_info(too_bad);
@@ -2661,30 +3202,92 @@ client_on_read(struct bufferevent *bev, void *ctx)
}
static void
-client_on_error(struct bufferevent *bev, short error, void *ctx)
+trusted_on_read(struct bufferevent *bev, void *ctx)
{
+ struct evbuffer *input;
+ client_t *client = NULL;
+ struct evbuffer_ptr tag;
+ unsigned char tnt[9];
+ size_t len;
+
+ client_find(bev, &client);
+ if (!client)
+ return;
+ if (!client->downstream)
+ {
+ /* should never happen; sanity check */
+ log_trace("Only trusted downstreams allowed");
+ client_clear(bev);
+ return;
+ }
+
+ input = bufferevent_get_input(bev);
+
+ while ((len = evbuffer_get_length(input)) >= 9)
+ {
+ tag = evbuffer_search(input, (const char*) msgbin, 8, NULL);
+ if (tag.pos < 0)
+ {
+ log_error("Bad message from downstream");
+ evbuffer_drain(input, len);
+ client_clear(bev);
+ return;
+ }
+
+ evbuffer_remove(input, tnt, 9);
+ log_trace("Downstream message: %d", tnt[8]);
+ switch (tnt[8])
+ {
+ case BIN_PING:
+ case BIN_STATS:
+ trusted_send_stats(client);
+ break;
+ case BIN_CONNECT:
+ trusted_on_client_connect(client);
+ break;
+ case BIN_DISCONNECT:
+ trusted_on_client_disconnect(client);
+ break;
+ case BIN_SHARE:
+ trusted_on_client_share(client);
+ break;
+ case BIN_BLOCK:
+ trusted_on_client_block(client);
+ break;
+ default:
+ log_error("Unknown message: %d", tnt[8]);
+ evbuffer_drain(input, len);
+ client_clear(bev);
+ return;
+ }
+ }
+}
+
+static void
+listener_on_error(struct bufferevent *bev, short error, void *ctx)
+{
+ struct event_base *base = (struct event_base*)ctx;
+ char *type = base != trusted_base ? "Miner" : "Downstream";
if (error & BEV_EVENT_EOF)
{
- /* connection has been closed */
- log_debug("Client disconnected. Removing.");
+ log_debug("%s disconnected. Removing.", type);
}
else if (error & BEV_EVENT_ERROR)
{
- /* check errno to see what error occurred */
- log_debug("Client error: %d. Removing.", errno);
+ log_debug("%s error: %d. Removing.", type, errno);
}
else if (error & BEV_EVENT_TIMEOUT)
{
- /* must be a timeout event handle, handle it */
- log_debug("Client timeout. Removing.");
+ log_debug("%s timeout. Removing.", type);
}
client_clear(bev);
}
static void
-client_on_accept(evutil_socket_t listener, short event, void *arg)
+listener_on_accept(evutil_socket_t listener, short event, void *arg)
{
struct event_base *base = (struct event_base*)arg;
+ char *type = base != trusted_base ? "miner" : "downstream";
struct sockaddr_storage ss;
socklen_t slen = sizeof(ss);
int fd = accept(listener, (struct sockaddr*)&ss, &slen);
@@ -2693,12 +3296,46 @@ client_on_accept(evutil_socket_t listener, short event, void *arg)
perror("accept");
return;
}
+ if (base == trusted_base && *config.trusted_allowed[0])
+ {
+ char *s = config.trusted_allowed[0];
+ char *e = s + (MAX_DOWNSTREAM * MAX_HOST);
+ char host[MAX_HOST];
+ bool match = false;
+ int rc = 0;
+ if ((rc = getnameinfo((struct sockaddr*)&ss, slen,
+ host, MAX_HOST, NULL, 0, NI_NUMERICHOST)))
+ {
+ log_error("Error parsing trusted allowed address: %s",
+ gai_strerror(rc));
+ return;
+ }
+ while (s < e)
+ {
+ if (strncmp(s, host, MAX_HOST) == 0)
+ {
+ match = true;
+ break;
+ }
+ s += MAX_HOST;
+ }
+ if (!match)
+ {
+ close(fd);
+ log_error("Host %s not allowed as trusted downstream", host);
+ return;
+ }
+ }
struct bufferevent *bev;
evutil_make_socket_nonblocking(fd);
bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
- bufferevent_setcb(bev, client_on_read, NULL, client_on_error, NULL);
+ bufferevent_setcb(bev,
+ base == trusted_base ? trusted_on_read : miner_on_read,
+ NULL, listener_on_error, arg);
bufferevent_setwatermark(bev, EV_READ, 0, MAX_LINE);
- client_add(fd, bev);
+ client_add(fd, bev, base == trusted_base);
+ log_info("New %s connected. Miner count: %d, Pool hashrate: %"PRIu64,
+ type, pool_stats.connected_miners, pool_stats.pool_hashrate);
bufferevent_enable(bev, EV_READ|EV_WRITE);
}
@@ -2727,7 +3364,7 @@ read_config(const char *config_file)
char path[MAX_PATH] = {0};
if (config_file)
{
- strncpy(path, config_file, MAX_PATH);
+ strncpy(path, config_file, MAX_PATH-1);
}
else
{
@@ -2739,7 +3376,7 @@ read_config(const char *config_file)
strcat(path, "/pool.conf");
if (access(path, R_OK) != 0)
{
- strncpy(path, getenv("HOME"), MAX_PATH);
+ strncpy(path, getenv("HOME"), MAX_PATH-1);
strcat(path, "/pool.conf");
if (access(path, R_OK) != 0)
{
@@ -2749,7 +3386,7 @@ read_config(const char *config_file)
}
}
}
- log_info("Reading config at: %s", path);
+ log_info("Reading config from: %s", path);
FILE *fp = fopen(path, "r");
if (!fp)
@@ -2757,7 +3394,7 @@ read_config(const char *config_file)
log_fatal("Cannot open config file. Aborting.");
exit(-1);
}
- char line[1024];
+ char line[1024] = {0};
char *key;
char *val;
const char *tok = " =";
@@ -2774,7 +3411,7 @@ read_config(const char *config_file)
val[strcspn(val, "\r\n")] = 0;
if (strcmp(key, "pool-listen") == 0)
{
- strncpy(config.pool_listen, val, sizeof(config.pool_listen));
+ strncpy(config.pool_listen, val, sizeof(config.pool_listen)-1);
}
else if (strcmp(key, "pool-port") == 0)
{
@@ -2790,7 +3427,7 @@ read_config(const char *config_file)
}
else if (strcmp(key, "rpc-host") == 0)
{
- strncpy(config.rpc_host, val, sizeof(config.rpc_host));
+ strncpy(config.rpc_host, val, sizeof(config.rpc_host)-1);
}
else if (strcmp(key, "rpc-port") == 0)
{
@@ -2798,7 +3435,7 @@ read_config(const char *config_file)
}
else if (strcmp(key, "wallet-rpc-host") == 0)
{
- strncpy(config.wallet_rpc_host, val, sizeof(config.rpc_host));
+ strncpy(config.wallet_rpc_host, val, sizeof(config.rpc_host)-1);
}
else if (strcmp(key, "wallet-rpc-port") == 0)
{
@@ -2810,7 +3447,7 @@ read_config(const char *config_file)
}
else if (strcmp(key, "pool-wallet") == 0)
{
- strncpy(config.pool_wallet, val, sizeof(config.pool_wallet));
+ strncpy(config.pool_wallet, val, sizeof(config.pool_wallet)-1);
}
else if (strcmp(key, "pool-start-diff") == 0)
{
@@ -2842,7 +3479,7 @@ read_config(const char *config_file)
}
else if (strcmp(key, "log-file") == 0)
{
- strncpy(config.log_file, val, sizeof(config.log_file));
+ strncpy(config.log_file, val, sizeof(config.log_file)-1);
}
else if (strcmp(key, "block-notified") == 0)
{
@@ -2854,16 +3491,53 @@ read_config(const char *config_file)
}
else if (strcmp(key, "data-dir") == 0)
{
- strncpy(config.data_dir, val, sizeof(config.data_dir));
+ strncpy(config.data_dir, val, sizeof(config.data_dir)-1);
}
else if (strcmp(key, "pid-file") == 0)
{
- strncpy(config.pid_file, val, sizeof(config.pid_file));
+ strncpy(config.pid_file, val, sizeof(config.pid_file)-1);
}
else if (strcmp(key, "forked") == 0)
{
config.forked = atoi(val);
}
+ else if (strcmp(key, "trusted-listen") == 0)
+ {
+ strncpy(config.trusted_listen, val,
+ sizeof(config.trusted_listen)-1);
+ }
+ else if (strcmp(key, "trusted-port") == 0)
+ {
+ config.trusted_port = atoi(val);
+ }
+ else if (strcmp(key, "trusted-allowed") == 0)
+ {
+ char *temp = strdup(val);
+ char *search = temp;
+ char *s = config.trusted_allowed[0];
+ char *e = s + (MAX_DOWNSTREAM * MAX_HOST);
+ char *ip;
+ while ((ip = strsep(&search, " ,")) && s < e)
+ {
+ if (!strlen(ip))
+ continue;
+ strncpy(s, ip, MAX_HOST-1);
+ s += MAX_HOST;
+ }
+ free(temp);
+ }
+ else if (strcmp(key, "upstream-host") == 0)
+ {
+ strncpy(config.upstream_host, val, sizeof(config.upstream_host)-1);
+ }
+ else if (strcmp(key, "upstream-port") == 0)
+ {
+ config.upstream_port = atoi(val);
+ }
+ else if (strcmp(key, "pool-view-key") == 0 && strlen(val) == 64)
+ {
+ memcpy(config.pool_view_key, val, 64);
+ }
}
fclose(fp);
@@ -2886,14 +3560,44 @@ read_config(const char *config_file)
if (config.retarget_ratio < 0 || config.retarget_ratio > 1)
{
log_fatal("Set retarget-ratio to any rational value within range "
- "[0, 1]. Clients will receive new jobs earlier if their latest"
+ "[0, 1]. Miners will receive new jobs earlier if their latest"
" work is less than retarget-ratio percentage of potential.");
exit(-1);
}
-
+ if (*config.upstream_host
+ && strcmp(config.upstream_host, config.pool_listen) == 0
+ && config.upstream_port == config.pool_port)
+ {
+ log_fatal("Cannot point upstream to the pool. Aborting.");
+ exit(-1);
+ }
+ if (*config.upstream_host
+ && strcmp(config.upstream_host, config.trusted_listen) == 0
+ && config.upstream_port == config.trusted_port)
+ {
+ log_fatal("Cannot point upstream to this trusted listener. Aborting.");
+ exit(-1);
+ }
}
+
static void print_config()
{
+ char display_allowed[MAX_HOST*MAX_DOWNSTREAM] = {0};
+ if (*config.trusted_allowed[0])
+ {
+ char *s = display_allowed;
+ char *e = display_allowed + sizeof(display_allowed);
+ char *f = config.trusted_allowed[0];
+ char *l = f + (MAX_DOWNSTREAM * MAX_HOST);
+ s = stecpy(s, f, e);
+ f += MAX_HOST;
+ while(*f && f < l)
+ {
+ s = stecpy(s, ",", e);
+ s = stecpy(s, f, e);
+ f += MAX_HOST;
+ }
+ }
log_info("\nCONFIG:\n"
" pool-listen = %s\n"
" pool-port = %u\n"
@@ -2917,7 +3621,12 @@ static void print_config()
" disable-self-select = %u\n"
" data-dir = %s\n"
" pid-file = %s\n"
- " forked = %u\n",
+ " forked = %u\n"
+ " trusted-listen = %s\n"
+ " trusted-port = %u\n"
+ " trusted-allowed = %s\n"
+ " upstream-host = %s\n"
+ " upstream-port = %u\n",
config.pool_listen,
config.pool_port,
config.pool_ssl_port,
@@ -2940,7 +3649,12 @@ static void print_config()
config.disable_self_select,
config.data_dir,
config.pid_file,
- config.forked);
+ config.forked,
+ config.trusted_listen,
+ config.trusted_port,
+ display_allowed,
+ config.upstream_host,
+ config.upstream_port);
}
static void
@@ -2957,16 +3671,81 @@ sigint_handler(int sig)
exit(0);
}
+static void *
+trusted_run(void *ctx)
+{
+ evutil_socket_t listener;
+ struct addrinfo *info;
+ int rc;
+ char port[6] = {0};
+
+ trusted_base = event_base_new();
+ if (!trusted_base)
+ {
+ log_fatal("Failed to create trusted event base");
+ return 0;
+ }
+
+ sprintf(port, "%d", config.trusted_port);
+ if ((rc = getaddrinfo(config.trusted_listen, port, 0, &info)))
+ {
+ log_fatal("Error parsing trusted listen address: %s",
+ gai_strerror(rc));
+ return 0;
+ }
+
+ listener = socket(info->ai_family, SOCK_STREAM, 0);
+ evutil_make_socket_nonblocking(listener);
+
+#ifndef WIN32
+ {
+ int one = 1;
+ setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
+ }
+#endif
+
+ if (bind(listener, info->ai_addr, info->ai_addrlen) < 0)
+ {
+ perror("bind");
+ goto bail;
+ }
+
+ freeaddrinfo(info);
+ info = NULL;
+
+ if (listen(listener, 16)<0)
+ {
+ perror("listen");
+ goto bail;
+ }
+
+ trusted_event = event_new(trusted_base, listener, EV_READ|EV_PERSIST,
+ listener_on_accept, (void*)trusted_base);
+ if (event_add(trusted_event, NULL) != 0)
+ {
+ log_fatal("Failed to add trusted socket listener event");
+ goto bail;
+ }
+
+ event_base_dispatch(trusted_base);
+
+bail:
+ if (info)
+ freeaddrinfo(info);
+ event_base_free(trusted_base);
+ return 0;
+}
+
static void
run(void)
{
evutil_socket_t listener;
struct addrinfo *info;
int rc;
- char port[6];
+ char port[6] = {0};
- base = event_base_new();
- if (!base)
+ pool_base = event_base_new();
+ if (!pool_base)
{
log_fatal("Failed to create event base");
return;
@@ -2992,57 +3771,98 @@ run(void)
if (bind(listener, info->ai_addr, info->ai_addrlen) < 0)
{
perror("bind");
- return;
+ goto bail;
}
+ freeaddrinfo(info);
+ info = NULL;
+
if (listen(listener, 16)<0)
{
perror("listen");
- return;
+ goto bail;
}
- listener_event = event_new(base, listener, EV_READ|EV_PERSIST,
- client_on_accept, (void*)base);
+ listener_event = event_new(pool_base, listener, EV_READ|EV_PERSIST,
+ listener_on_accept, (void*)pool_base);
if (event_add(listener_event, NULL) != 0)
{
log_fatal("Failed to add socket listener event");
- return;
+ goto bail;
}
- signal_usr1 = evsignal_new(base, SIGUSR1, sigusr1_handler, NULL);
+ signal_usr1 = evsignal_new(pool_base, SIGUSR1, sigusr1_handler, NULL);
event_add(signal_usr1, NULL);
+
+ if (*config.trusted_listen && config.trusted_port)
+ {
+ log_info("Starting trusted listener on: %s:%d",
+ config.trusted_listen, config.trusted_port);
+ if (pthread_create(&trusted_th, NULL, trusted_run, NULL))
+ {
+ log_fatal("Cannot create trusted thread");
+ goto bail;
+ }
+ pthread_detach(trusted_th);
+ }
+
+ if (*config.upstream_host && config.upstream_port)
+ {
+ log_info("Starting upstream connection to: %s:%d",
+ config.upstream_host, config.upstream_port);
+ upstream_connect();
+ }
+
if (!config.block_notified)
{
- timer_120s = evtimer_new(base, timer_on_120s, NULL);
+ timer_120s = evtimer_new(pool_base, timer_on_120s, NULL);
timer_on_120s(-1, EV_TIMEOUT, NULL);
}
else
fetch_last_block_header();
-
fetch_view_key();
-
- timer_10m = evtimer_new(base, timer_on_10m, NULL);
+ timer_10m = evtimer_new(pool_base, timer_on_10m, NULL);
timer_on_10m(-1, EV_TIMEOUT, NULL);
+ if (*config.upstream_host)
+ {
+ timer_10s = evtimer_new(pool_base, timer_on_10s, NULL);
+ timer_30s = evtimer_new(pool_base, timer_on_30s, NULL);
+ timer_on_30s(-1, EV_TIMEOUT, NULL);
+ }
- event_base_dispatch(base);
+ event_base_dispatch(pool_base);
+
+bail:
+ if (info)
+ freeaddrinfo(info);
}
static void
cleanup(void)
{
log_info("Performing cleanup");
- if (listener_event)
- event_free(listener_event);
- if (config.webui_port)
- stop_web_ui();
- if (signal_usr1)
- event_free(signal_usr1);
+ if (timer_10s)
+ event_free(timer_10s);
+ if (timer_30s)
+ event_free(timer_30s);
if (timer_120s)
event_free(timer_120s);
if (timer_10m)
event_free(timer_10m);
- if (base)
- event_base_free(base);
+ if (listener_event)
+ event_free(listener_event);
+ if (trusted_event)
+ event_free(trusted_event);
+ if (upstream_event)
+ bufferevent_free(upstream_event);
+ if (config.webui_port)
+ stop_web_ui();
+ if (signal_usr1)
+ event_free(signal_usr1);
+ if (trusted_base)
+ event_base_loopbreak(trusted_base);
+ if (pool_base)
+ event_base_free(pool_base);
pool_clients_free();
if (bsh)
bstack_free(bsh);
@@ -3073,8 +3893,6 @@ print_help(struct option *opts)
int main(int argc, char **argv)
{
- int evthread_use_pthreads(void);
-
static struct option options[] =
{
{"config-file", required_argument, 0, 'c'},
@@ -3149,17 +3967,17 @@ int main(int argc, char **argv)
/* Any supplied command line options take precedent... */
if (log_file)
{
- strncpy(config.log_file, log_file, sizeof(config.log_file));
+ strncpy(config.log_file, log_file, sizeof(config.log_file)-1);
free(log_file);
}
if (data_dir)
{
- strncpy(config.data_dir, data_dir, sizeof(config.data_dir));
+ strncpy(config.data_dir, data_dir, sizeof(config.data_dir)-1);
free(data_dir);
}
if (pid_file)
{
- strncpy(config.pid_file, pid_file, sizeof(config.pid_file));
+ strncpy(config.pid_file, pid_file, sizeof(config.pid_file)-1);
free(pid_file);
}
if (forked > -1)
@@ -3178,7 +3996,7 @@ int main(int argc, char **argv)
}
print_config();
- log_info("Starting pool");
+ log_info("Starting pool on: %s:%d", config.pool_listen, config.pool_port);
if (config.forked)
{
@@ -3189,6 +4007,8 @@ int main(int argc, char **argv)
forkoff(pf);
}
+ int evthread_use_pthreads(void);
+
signal(SIGINT, sigint_handler);
signal(SIGPIPE, SIG_IGN);
atexit(cleanup);
@@ -3207,7 +4027,7 @@ int main(int argc, char **argv)
bn_ctx = BN_CTX_new();
base_diff = NULL;
BN_hex2bn(&base_diff,
- "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF");
+ "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF");
uuid_t iid;
uuid_generate(iid);
@@ -3219,7 +4039,7 @@ int main(int argc, char **argv)
uic.port = config.webui_port;
uic.pool_stats = &pool_stats;
uic.pool_fee = config.pool_fee;
- strncpy(uic.pool_listen, config.pool_listen, sizeof(uic.pool_listen));
+ strncpy(uic.pool_listen, config.pool_listen, sizeof(uic.pool_listen)-1);
uic.pool_port = config.pool_port;
uic.pool_ssl_port = config.pool_ssl_port;
uic.allow_self_select = !config.disable_self_select;
diff --git a/src/webui.c b/src/webui.c
index 1af4296..58ac5f1 100644
--- a/src/webui.c
+++ b/src/webui.c
@@ -159,7 +159,7 @@ thread_main(void *ctx)
int
start_web_ui(wui_context_t *context)
{
- log_debug("Starting Web UI");
+ log_info("Starting Web UI on %s:%d", context->pool_listen, context->port);
if (webui_base || handle)
{
log_error("Already running");
diff --git a/src/webui.h b/src/webui.h
index dfc42ca..6b9cb1e 100644
--- a/src/webui.h
+++ b/src/webui.h
@@ -50,13 +50,13 @@ typedef struct pool_stats_t
typedef struct wui_context_t
{
- uint32_t port;
+ uint16_t port;
pool_stats_t *pool_stats;
double pool_fee;
double payment_threshold;
- uint32_t pool_port;
+ uint16_t pool_port;
char pool_listen[256];
- uint32_t pool_ssl_port;
+ uint16_t pool_ssl_port;
unsigned allow_self_select;
} wui_context_t;
diff --git a/stratum-ss.md b/sss.md
similarity index 100%
rename from stratum-ss.md
rename to sss.md