diff --git a/src/pool.c b/src/pool.c index af3b602..e84e59c 100644 --- a/src/pool.c +++ b/src/pool.c @@ -177,7 +177,7 @@ typedef struct pool_clients_t size_t count; } pool_clients_t; -typedef struct share_t +typedef struct share_t { uint64_t height; uint64_t difficulty; @@ -185,7 +185,7 @@ typedef struct share_t time_t timestamp; } share_t; -typedef struct block_t +typedef struct block_t { uint64_t height; char hash[64]; @@ -211,53 +211,6 @@ struct rpc_callback_t void *data; }; -static int database_init(); -static void database_close(); -static int store_share(uint64_t height, share_t *share); -static int store_block(uint64_t height, block_t *block); -static int process_blocks(block_t *blocks, size_t count); -static int payout_block(block_t *block, MDB_txn *parent); -static int balance_add(const char *address, uint64_t amount, MDB_txn *parent); -static int send_payments(); -static int startup_pauout(uint64_t height); -static void update_pool_hr(); -static void pool_clients_init(); -static void pool_clients_free(); -static void pool_clients_send_job(); -static void target_to_hex(uint64_t target, char *target_hex); -static void stratum_get_proxy_job_body(char *body, const client_t *client, - const char *block_hex, bool response); -static void stratum_get_job_body(char *body, const client_t *client, bool response); -static inline void stratum_get_error_body(char *body, int json_id, const char *error); -static inline void stratum_get_status_body(char *body, int json_id, const char *status); -static void client_add(int fd, struct bufferevent *bev); -static void client_find(struct bufferevent *bev, client_t **client); -static void client_clear(struct bufferevent *bev); -static void client_send_job(client_t *client, bool response); -static void client_clear_jobs(client_t *client); -static job_t * client_find_job(client_t *client, const char *job_id); -static void response_to_block_template(json_object *result, block_template_t *block_template); -static void response_to_block(json_object *block_header, block_t *block); -static void rpc_get_request_body(char *body, const char* method, char* fmt, ...); -static void rpc_on_response(struct evhttp_request *req, void *arg); -static void rpc_request(struct event_base *base, const char *body, rpc_callback_t *callback); -static void rpc_wallet_request(struct event_base *base, const char *body, rpc_callback_t *callback); -static void rpc_on_block_template(const char* data, rpc_callback_t *callback); -static void rpc_on_block_headers_range(const char* data, rpc_callback_t *callback); -static void rpc_on_block_header_by_height(const char* data, rpc_callback_t *callback); -static void rpc_on_last_block_header(const char* data, rpc_callback_t *callback); -static void rpc_on_block_submitted(const char* data, rpc_callback_t *callback); -static void rpc_on_wallet_transferred(const char* data, rpc_callback_t *callback); -static void timer_on_120s(int fd, short kind, void *ctx); -static void timer_on_10m(int fd, short kind, void *ctx); -static void client_on_login(json_object *message, client_t *client); -static void client_on_submit(json_object *message, client_t *client); -static void client_on_read(struct bufferevent *bev, void *ctx); -static void client_on_error(struct bufferevent *bev, short error, void *ctx); -static void client_on_accept(evutil_socket_t listener, short event, void *arg); -static void send_validation_error(const client_t *client, const char *message); -static void sigusr1_handler(evutil_socket_t fd, short event, void *arg); - static config_t config; static pool_clients_t pool_clients; static bstack_t *bst; @@ -339,7 +292,8 @@ compare_block(const MDB_val *a, const MDB_val *b) const block_t *vb = (const block_t*) b->mv_data; int sc = memcmp(va->hash, vb->hash, 64); if (sc == 0) - return (va->timestamp < vb->timestamp) ? -1 : va->timestamp > vb->timestamp; + return (va->timestamp < vb->timestamp) ? -1 : + va->timestamp > vb->timestamp; else return sc; } @@ -351,7 +305,8 @@ compare_share(const MDB_val *a, const MDB_val *b) const share_t *vb = (const share_t*) b->mv_data; int sc = strcmp(va->address, vb->address); if (sc == 0) - return (va->timestamp < vb->timestamp) ? -1 : va->timestamp > vb->timestamp; + return (va->timestamp < vb->timestamp) ? -1 : + va->timestamp > vb->timestamp; else return sc; } @@ -361,7 +316,8 @@ compare_payment(const MDB_val *a, const MDB_val *b) { const payment_t *va = (const payment_t*) a->mv_data; const payment_t *vb = (const payment_t*) b->mv_data; - return (va->timestamp < vb->timestamp) ? -1 : va->timestamp > vb->timestamp; + return (va->timestamp < vb->timestamp) ? -1 : + va->timestamp > vb->timestamp; } static int @@ -386,25 +342,28 @@ database_init() log_fatal("%s\n", err); exit(rc); } - if ((rc = mdb_dbi_open(txn, "shares", MDB_INTEGERKEY | MDB_CREATE | MDB_DUPSORT | MDB_DUPFIXED, &db_shares)) != 0) + uint32_t flags = MDB_INTEGERKEY | MDB_CREATE | MDB_DUPSORT | MDB_DUPFIXED; + if ((rc = mdb_dbi_open(txn, "shares", flags, &db_shares)) != 0) { err = mdb_strerror(rc); log_fatal("%s\n", err); exit(rc); } - if ((rc = mdb_dbi_open(txn, "blocks", MDB_INTEGERKEY | MDB_CREATE | MDB_DUPSORT | MDB_DUPFIXED, &db_blocks)) != 0) + if ((rc = mdb_dbi_open(txn, "blocks", flags, &db_blocks)) != 0) { err = mdb_strerror(rc); log_fatal("%s\n", err); exit(rc); } - if ((rc = mdb_dbi_open(txn, "payments", MDB_CREATE | MDB_DUPSORT | MDB_DUPFIXED, &db_payments)) != 0) + flags = MDB_CREATE | MDB_DUPSORT | MDB_DUPFIXED; + if ((rc = mdb_dbi_open(txn, "payments", flags, &db_payments)) != 0) { err = mdb_strerror(rc); log_fatal("%s\n", err); exit(rc); } - if ((rc = mdb_dbi_open(txn, "balance", MDB_CREATE, &db_balance)) != 0) + flags = MDB_CREATE; + if ((rc = mdb_dbi_open(txn, "balance", flags, &db_balance)) != 0) { err = mdb_strerror(rc); log_fatal("%s\n", err); @@ -416,7 +375,7 @@ database_init() mdb_set_compare(txn, db_blocks, compare_uint64); mdb_set_dupsort(txn, db_blocks, compare_block); - + mdb_set_compare(txn, db_payments, compare_string); mdb_set_dupsort(txn, db_payments, compare_payment); @@ -503,7 +462,8 @@ miner_hr(const char *address) uint64_t hr = 0; for (size_t i = 0; i < pool_clients.count; i++, c++) { - if (c->connected_since != 0 && strncmp(c->address, address, ADDRESS_MAX) == 0) + if (c->connected_since != 0 + && strncmp(c->address, address, ADDRESS_MAX) == 0) { double d = difftime(time(NULL), c->connected_since); if (d == 0.0) @@ -561,173 +521,6 @@ cleanup: return balance; } -static int -process_blocks(block_t *blocks, size_t count) -{ - log_debug("Processing blocks"); - /* - For each block, lookup block in db. - If found, make sure found is locked and not orphaned. - If both not orphaned and unlocked, payout, set unlocked. - If block heights differ / orphaned, set orphaned. - */ - int rc; - char *err; - MDB_txn *txn; - MDB_cursor *cursor; - if ((rc = mdb_txn_begin(env, NULL, 0, &txn)) != 0) - { - err = mdb_strerror(rc); - log_error("%s", err); - return rc; - } - if ((rc = mdb_cursor_open(txn, db_blocks, &cursor)) != 0) - { - err = mdb_strerror(rc); - log_error("%s", err); - mdb_txn_abort(txn); - return rc; - } - - for (int i=0; iheight); - MDB_val key = { sizeof(ib->height), (void*)&ib->height }; - MDB_val val; - MDB_cursor_op op = MDB_SET; - while (1) - { - rc = mdb_cursor_get(cursor, &key, &val, op); - op = MDB_NEXT_DUP; - if (rc == MDB_NOTFOUND || rc != 0) - { - log_trace("No stored block at height %"PRIu64, ib->height); - if (rc != MDB_NOTFOUND) - { - err = mdb_strerror(rc); - log_debug("No stored block at height %"PRIu64" with error: %d", ib->height, err); - } - break; - } - block_t *sb = (block_t*)val.mv_data; - if (sb->status != BLOCK_LOCKED) - { - continue; - } - if (ib->status & BLOCK_ORPHANED) - { - log_debug("Orphaned block at height %"PRIu64, ib->height); - block_t bp; - memcpy(&bp, sb, sizeof(block_t)); - bp.status |= BLOCK_ORPHANED; - MDB_val new_val = {sizeof(block_t), (void*)&bp}; - mdb_cursor_put(cursor, &key, &new_val, MDB_CURRENT); - continue; - } - if (memcmp(ib->hash, sb->hash, 64) == 0 && memcmp(ib->prev_hash, sb->prev_hash, 64) != 0) - { - log_warn("Have a block with matching heights but differing parents! Setting orphaned.\n"); - block_t bp; - memcpy(&bp, sb, sizeof(block_t)); - bp.status |= BLOCK_ORPHANED; - MDB_val new_val = {sizeof(block_t), (void*)&bp}; - mdb_cursor_put(cursor, &key, &new_val, MDB_CURRENT); - continue; - } - block_t bp; - memcpy(&bp, sb, sizeof(block_t)); - bp.status |= BLOCK_UNLOCKED; - bp.reward = ib->reward; - rc = payout_block(&bp, txn); - if (rc == 0) - { - log_debug("Paided out block %"PRIu64, bp.height); - MDB_val new_val = {sizeof(block_t), (void*)&bp}; - mdb_cursor_put(cursor, &key, &new_val, MDB_CURRENT); - } - else - log_trace("%s", mdb_strerror(rc)); - } - } - - rc = mdb_txn_commit(txn); - return rc; -} - -static int -payout_block(block_t *block, MDB_txn *parent) -{ - /* - PPLNS - */ - log_info("Payout on block at height %"PRIu64, block->height); - int rc; - char *err; - MDB_txn *txn; - MDB_cursor *cursor; - uint64_t height = block->height; - uint64_t total_paid = 0; - if ((rc = mdb_txn_begin(env, parent, 0, &txn)) != 0) - { - err = mdb_strerror(rc); - log_error("%s", err); - return rc; - } - if ((rc = mdb_cursor_open(txn, db_shares, &cursor)) != 0) - { - err = mdb_strerror(rc); - log_error("%s", err); - mdb_txn_abort(txn); - return rc; - } - - MDB_cursor_op op = MDB_SET; - while (1) - { - uint64_t current_height = height; - MDB_val key = { sizeof(current_height), (void*)¤t_height }; - MDB_val val; - rc = mdb_cursor_get(cursor, &key, &val, op); - op = MDB_NEXT_DUP; - if (rc == MDB_NOTFOUND && total_paid < block->reward) - { - if (height == 0) - break; - height--; - op = MDB_SET; - continue; - } - if (rc != 0 && rc != MDB_NOTFOUND) - { - log_error("Error getting balance: %s", mdb_strerror(rc)); - break; - } - if (total_paid == block->reward) - break; - - share_t *share = (share_t*)val.mv_data; - uint64_t amount = floor((double)share->difficulty / ((double)block->difficulty * config.share_mul) * block->reward); - if (total_paid + amount > block->reward) - amount = block->reward - total_paid; - total_paid += amount; - uint64_t fee = amount * config.pool_fee; - amount -= fee; - if (amount == 0) - continue; - rc = balance_add(share->address, amount, txn); - if (rc != 0) - { - mdb_cursor_close(cursor); - mdb_txn_abort(txn); - return rc; - } - } - - rc = mdb_txn_commit(txn); - return rc; -} - static int balance_add(const char *address, uint64_t amount, MDB_txn *parent) { @@ -785,20 +578,25 @@ balance_add(const char *address, uint64_t amount, MDB_txn *parent) } static int -send_payments() +payout_block(block_t *block, MDB_txn *parent) { - uint64_t threshold = 1000000000000 * config.payment_threshold; + /* + PPLNS + */ + log_info("Payout on block at height %"PRIu64, block->height); int rc; char *err; MDB_txn *txn; MDB_cursor *cursor; - if ((rc = mdb_txn_begin(env, NULL, MDB_RDONLY, &txn)) != 0) + uint64_t height = block->height; + uint64_t total_paid = 0; + if ((rc = mdb_txn_begin(env, parent, 0, &txn)) != 0) { err = mdb_strerror(rc); log_error("%s", err); return rc; } - if ((rc = mdb_cursor_open(txn, db_balance, &cursor)) != 0) + if ((rc = mdb_cursor_open(txn, db_shares, &cursor)) != 0) { err = mdb_strerror(rc); log_error("%s", err); @@ -806,79 +604,751 @@ send_payments() return rc; } - size_t payments_count = 0; - size_t payments_max_count = 25; - size_t payments_size = payments_max_count * sizeof(payment_t); - payment_t *payments = (payment_t*) calloc(1, payments_size); - memset(payments, 0, payments_size); - payment_t *payment = payments; - payment_t *end_payment = payment + payments_max_count; - - MDB_cursor_op op = MDB_FIRST; + MDB_cursor_op op = MDB_SET; while (1) { - MDB_val key; + uint64_t current_height = height; + MDB_val key = { sizeof(current_height), (void*)¤t_height }; MDB_val val; rc = mdb_cursor_get(cursor, &key, &val, op); - op = MDB_NEXT; - if (rc != 0) + op = MDB_NEXT_DUP; + if (rc == MDB_NOTFOUND && total_paid < block->reward) + { + if (height == 0) + break; + height--; + op = MDB_SET; + continue; + } + if (rc != 0 && rc != MDB_NOTFOUND) + { + log_error("Error getting balance: %s", mdb_strerror(rc)); + break; + } + if (total_paid == block->reward) break; - const char *address = (const char*)key.mv_data; - uint64_t amount = *(uint64_t*)val.mv_data; - - if (amount < threshold) + share_t *share = (share_t*)val.mv_data; + uint64_t amount = floor((double)share->difficulty / + ((double)block->difficulty * config.share_mul) * block->reward); + if (total_paid + amount > block->reward) + amount = block->reward - total_paid; + total_paid += amount; + uint64_t fee = amount * config.pool_fee; + amount -= fee; + if (amount == 0) continue; - - log_info("Sending payment of %"PRIu64" to %s\n", amount, address); - - memcpy(payment->address, address, ADDRESS_MAX); - payment->amount = amount; - payments_count++; - - if (++payment == end_payment) + rc = balance_add(share->address, amount, txn); + if (rc != 0) { - payments_size <<= 1; - payments = (payment_t*) realloc(payments, payments_size); - payment = payments + payments_max_count; - memset(payment, 0, sizeof(payment_t) * payments_max_count); - payments_max_count <<= 1; - end_payment = payments + payments_max_count; + mdb_cursor_close(cursor); + mdb_txn_abort(txn); + return rc; } } - mdb_cursor_close(cursor); - mdb_txn_abort(txn); - if (payments_count) + rc = mdb_txn_commit(txn); + return rc; +} + +static int +process_blocks(block_t *blocks, size_t count) +{ + log_debug("Processing blocks"); + /* + For each block, lookup block in db. + If found, make sure found is locked and not orphaned. + If both not orphaned and unlocked, payout, set unlocked. + If block heights differ / orphaned, set orphaned. + */ + int rc; + char *err; + MDB_txn *txn; + MDB_cursor *cursor; + if ((rc = mdb_txn_begin(env, NULL, 0, &txn)) != 0) { - size_t body_size = 160 * payments_count + 128; - char *body = (char*) alloca(body_size); - char *start = body; - char *end = body + body_size; - start = stecpy(start, "{\"id\":\"0\",\"jsonrpc\":\"2.0\",\"method\":\"transfer_split\",\"params\":{" - "\"ring_size\":11,\"destinations\":[", end); - for (size_t i=0; iheight); + MDB_val key = { sizeof(ib->height), (void*)&ib->height }; + MDB_val val; + MDB_cursor_op op = MDB_SET; + while (1) { - payment_t *p = &payments[i]; - start = stecpy(start, "{\"address\":\"", end); - start = stecpy(start, p->address, end); - start = stecpy(start, "\",\"amount\":", end); - sprintf(start, "%"PRIu64"}", p->amount); - start = body + strlen(body); - if (i != payments_count -1) - start = stecpy(start, ",", end); + rc = mdb_cursor_get(cursor, &key, &val, op); + op = MDB_NEXT_DUP; + if (rc == MDB_NOTFOUND || rc != 0) + { + log_trace("No stored block at height %"PRIu64, ib->height); + if (rc != MDB_NOTFOUND) + { + err = mdb_strerror(rc); + log_debug("No stored block at height %"PRIu64 + " with error: %d", + ib->height, err); + } + break; + } + block_t *sb = (block_t*)val.mv_data; + if (sb->status != BLOCK_LOCKED) + { + continue; + } + if (ib->status & BLOCK_ORPHANED) + { + log_debug("Orphaned block at height %"PRIu64, ib->height); + block_t bp; + memcpy(&bp, sb, sizeof(block_t)); + bp.status |= BLOCK_ORPHANED; + MDB_val new_val = {sizeof(block_t), (void*)&bp}; + mdb_cursor_put(cursor, &key, &new_val, MDB_CURRENT); + continue; + } + if (memcmp(ib->hash, sb->hash, 64) == 0 + && memcmp(ib->prev_hash, sb->prev_hash, 64) != 0) + { + log_warn("Block with matching heights but differing parents! " + "Setting orphaned.\n"); + block_t bp; + memcpy(&bp, sb, sizeof(block_t)); + bp.status |= BLOCK_ORPHANED; + MDB_val new_val = {sizeof(block_t), (void*)&bp}; + mdb_cursor_put(cursor, &key, &new_val, MDB_CURRENT); + continue; + } + block_t bp; + memcpy(&bp, sb, sizeof(block_t)); + bp.status |= BLOCK_UNLOCKED; + bp.reward = ib->reward; + rc = payout_block(&bp, txn); + if (rc == 0) + { + log_debug("Paided out block %"PRIu64, bp.height); + MDB_val new_val = {sizeof(block_t), (void*)&bp}; + mdb_cursor_put(cursor, &key, &new_val, MDB_CURRENT); + } else - start = stecpy(start, "]}}", end); + log_trace("%s", mdb_strerror(rc)); } - log_trace(body); - rpc_callback_t *cb = rpc_callback_new( - rpc_on_wallet_transferred, payments); - rpc_wallet_request(base, body, cb); + } + + rc = mdb_txn_commit(txn); + return rc; +} + +static void +update_pool_hr() +{ + uint64_t hr = 0; + client_t *c = pool_clients.clients; + for (size_t i = 0; i < pool_clients.count; i++, c++) + { + if (c->connected_since != 0) + { + double d = difftime(time(NULL), c->connected_since); + if (d == 0.0) + continue; + hr += c->hashes / d; + } + } + pool_stats.pool_hashrate = hr; +} + +static void +template_recycle(void *item) +{ + block_template_t *bt = (block_template_t*) item; + log_trace("Recycle block template at height: %"PRIu64, bt->height); + if (bt->blockhashing_blob) + { + free(bt->blockhashing_blob); + bt->blockhashing_blob = NULL; + } + if (bt->blocktemplate_blob) + { + free(bt->blocktemplate_blob); + bt->blocktemplate_blob = NULL; + } +} + +static void +target_to_hex(uint64_t target, char *target_hex) +{ + if (target & 0xFFFFFFFF00000000) + { + log_debug("High target requested: %"PRIu64, target); + bin_to_hex((const char*)&target, 8, &target_hex[0], 16); + target_hex[16] = '\0'; + return; + } + BIGNUM *diff = BN_new(); + BIGNUM *bnt = BN_new(); +#ifdef SIXTY_FOUR_BIT_LONG + BN_set_word(bnt, target); +#else + char st[24]; + snprintf(st, 24, "%"PRIu64, target); + BN_dec2bn(&bnt, st); +#endif + BN_div(diff, NULL, base_diff, bnt, bn_ctx); + BN_rshift(diff, diff, 224); + uint32_t w = BN_get_word(diff); + bin_to_hex((const char*)&w, 4, &target_hex[0], 8); + target_hex[8] = '\0'; + BN_free(bnt); + BN_free(diff); +} + +static void +stratum_get_proxy_job_body(char *body, const client_t *client, + const char *block_hex, bool response) +{ + int json_id = client->json_id; + const char *client_id = client->client_id; + const job_t *job = &client->active_jobs[0]; + char job_id[33]; + bin_to_hex((const char*)job->id, sizeof(uuid_t), job_id, 32); + uint64_t target = job->target; + char target_hex[17]; + target_to_hex(target, &target_hex[0]); + const block_template_t *bt = job->block_template; + + if (response) + { + snprintf(body, JOB_BODY_MAX, "{\"id\":%d,\"jsonrpc\":\"2.0\"," + "\"error\":null,\"result\"" + ":{\"id\":\"%.32s\",\"job\":{\"blocktemplate_blob\":\"%s\"," + "\"job_id\":\"%.32s\"," + "\"difficulty\":%"PRIu64",\"height\":%"PRIu64"," + "\"reserved_offset\":%u," + "\"client_nonce_offset\":%u,\"client_pool_offset\":%u," + "\"target_diff\":%"PRIu64",\"target_diff_hex\":\"%s\"}," + "\"status\":\"OK\"}}\n", + json_id, client_id, block_hex, job_id, + bt->difficulty, bt->height, bt->reserved_offset, + bt->reserved_offset + 12, + bt->reserved_offset + 8, target, target_hex); } else - free(payments); + { + snprintf(body, JOB_BODY_MAX, "{\"jsonrpc\":\"2.0\",\"method\":" + "\"job\",\"params\"" + ":{\"id\":\"%.32s\",\"job\":{\"blocktemplate_blob\":\"%s\"," + "\"job_id\":\"%.32s\"," + "\"difficulty\":%"PRIu64",\"height\":%"PRIu64"," + "\"reserved_offset\":%u," + "\"client_nonce_offset\":%u,\"client_pool_offset\":%u," + "\"target_diff\":%"PRIu64",\"target_diff_hex\":\"%s\"}," + "\"status\":\"OK\"}}\n", client_id, block_hex, job_id, + bt->difficulty, bt->height, + bt->reserved_offset, bt->reserved_offset + 12, + bt->reserved_offset + 8, target, target_hex); + } +} - return 0; +static void +stratum_get_job_body(char *body, const client_t *client, bool response) +{ + int json_id = client->json_id; + const char *client_id = client->client_id; + const job_t *job = &client->active_jobs[0]; + char job_id[33]; + bin_to_hex((const char*)job->id, sizeof(uuid_t), job_id, 32); + const char *blob = job->blob; + uint64_t target = job->target; + uint64_t height = job->block_template->height; + char target_hex[17]; + target_to_hex(target, &target_hex[0]); + + if (response) + { + snprintf(body, JOB_BODY_MAX, "{\"id\":%d,\"jsonrpc\":\"2.0\"," + "\"error\":null,\"result\"" + ":{\"id\":\"%.32s\",\"job\":{" + "\"blob\":\"%s\",\"job_id\":\"%.32s\",\"target\":\"%s\"," + "\"height\":%"PRIu64"}," + "\"status\":\"OK\"}}\n", + json_id, client_id, blob, job_id, target_hex, height); + } + else + { + snprintf(body, JOB_BODY_MAX, "{\"jsonrpc\":\"2.0\",\"method\":" + "\"job\",\"params\"" + ":{\"id\":\"%.32s\",\"blob\":\"%s\",\"job_id\":\"%.32s\"," + "\"target\":\"%s\"," + "\"height\":%"PRIu64"}}\n", + client_id, blob, job_id, target_hex, height); + } +} + +static inline void +stratum_get_error_body(char *body, int json_id, const char *error) +{ + snprintf(body, ERROR_BODY_MAX, "{\"id\":%d,\"jsonrpc\":\"2.0\"," + "\"error\":" + "{\"code\":-1, \"message\":\"%s\"}}\n", json_id, error); +} + +static inline void +stratum_get_status_body(char *body, int json_id, const char *status) +{ + snprintf(body, STATUS_BODY_MAX, "{\"id\":%d,\"jsonrpc\":\"2.0\"," + "\"error\":null,\"result\":{\"status\":\"%s\"}}\n", + json_id, status); +} + +static void +send_validation_error(const client_t *client, const char *message) +{ + struct evbuffer *output = bufferevent_get_output(client->bev); + char body[ERROR_BODY_MAX]; + stratum_get_error_body(body, client->json_id, message); + evbuffer_add(output, body, strlen(body)); + log_debug("Validation error: %s", message); +} + +static void +client_clear_jobs(client_t *client) +{ + for (size_t i=0; iactive_jobs[i]; + if (job->blob != NULL) + { + free(job->blob); + job->blob = NULL; + } + if (job->submissions != NULL) + { + free(job->submissions); + job->submissions = NULL; + job->submissions_count = 0; + } + } +} + +static job_t * +client_find_job(client_t *client, const char *job_id) +{ + uuid_t jid; + hex_to_bin(job_id, (char*)&jid, sizeof(uuid_t)); + for (size_t i=0; iactive_jobs[i]; + if (memcmp(job->id, jid, sizeof(uuid_t)) == 0) + return job; + } + return NULL; +} + +static void +client_send_job(client_t *client, bool response) +{ + /* First cycle jobs */ + job_t *last = &client->active_jobs[CLIENT_JOBS_MAX-1]; + if (last->blob != NULL) + { + free(last->blob); + last->blob = NULL; + } + if (last->submissions != NULL) + { + free(last->submissions); + last->submissions = NULL; + last->submissions_count = 0; + } + for (size_t i=CLIENT_JOBS_MAX-1; i>0; i--) + { + job_t *current = &client->active_jobs[i]; + job_t *prev = &client->active_jobs[i-1]; + memcpy(current, prev, sizeof(job_t)); + } + job_t *job = &client->active_jobs[0]; + memset(job, 0, sizeof(job_t)); + + /* Quick check we actually have a block template */ + block_template_t *bt = bstack_peek(bst); + if (!bt) + { + log_warn("Cannot send client a job: No block template"); + return; + } + + /* + 1. Convert blocktemplate_blob to binary + 2. Update bytes in reserved space at reserved_offset + 3. Get block hashing blob for job + 4. Send + */ + + /* Convert template to blob */ + size_t bin_size = strlen(bt->blocktemplate_blob) >> 1; + char *block = calloc(bin_size, sizeof(char)); + hex_to_bin(bt->blocktemplate_blob, block, bin_size); + + /* Set the extra nonce in our reserved space */ + char *p = block; + p += bt->reserved_offset; + ++extra_nonce; + memcpy(p, &extra_nonce, sizeof(extra_nonce)); + job->extra_nonce = extra_nonce; + + /* Get hashong blob */ + size_t hashing_blob_size; + char *hashing_blob = NULL; + get_hashing_blob(block, bin_size, &hashing_blob, &hashing_blob_size); + + /* Make hex */ + job->blob = calloc((hashing_blob_size << 1) +1, sizeof(char)); + bin_to_hex(hashing_blob, hashing_blob_size, job->blob, + hashing_blob_size << 1); + log_trace("Miner hashing blob: %s", job->blob); + + /* Save a job id */ + uuid_generate(job->id); + + /* Hold reference to block template */ + job->block_template = bt; + + /* Send */ + char job_id[33]; + bin_to_hex((const char*)job->id, sizeof(uuid_t), job_id, 32); + + /* Retarget */ + double duration = difftime(time(NULL), client->connected_since); + uint8_t retarget_time = client->is_proxy ? 5 : 120; + uint64_t target = fmax((double)client->hashes / + duration * retarget_time, config.pool_start_diff); + job->target = target; + log_debug("Client %.32s target now %"PRIu64, client->client_id, target); + + char body[JOB_BODY_MAX]; + if (!client->is_proxy) + { + stratum_get_job_body(body, client, response); + } + else + { + char *block_hex = calloc(bin_size+1, sizeof(char)); + bin_to_hex(block, bin_size, block_hex, bin_size << 1); + stratum_get_proxy_job_body(body, client, block_hex, response); + free(block_hex); + } + log_trace("Client job: %s", body); + struct evbuffer *output = bufferevent_get_output(client->bev); + evbuffer_add(output, body, strlen(body)); + free(block); + free(hashing_blob); +} + +static void +pool_clients_send_job() +{ + client_t *c = pool_clients.clients; + for (size_t i = 0; i < pool_clients.count; i++, c++) + { + if (c->fd == 0) + continue; + client_send_job(c, false); + } +} + +static void +pool_clients_init() +{ + assert(pool_clients.count == 0); + pool_clients.count = POOL_CLIENTS_GROW; + pool_clients.clients = (client_t*) calloc(pool_clients.count, + sizeof(client_t)); +} + +static void +pool_clients_free() +{ + assert(pool_clients.count != 0); + client_t *c = pool_clients.clients; + for (size_t i = 0; i < pool_clients.count; i++, c++) + { + client_clear_jobs(c); + } + free(pool_clients.clients); +} + +static void +response_to_block_template(json_object *result, + block_template_t *block_template) +{ + JSON_GET_OR_WARN(blockhashing_blob, result, json_type_string); + JSON_GET_OR_WARN(blocktemplate_blob, result, json_type_string); + JSON_GET_OR_WARN(difficulty, result, json_type_int); + JSON_GET_OR_WARN(height, result, json_type_int); + JSON_GET_OR_WARN(prev_hash, result, json_type_string); + JSON_GET_OR_WARN(reserved_offset, result, json_type_int); + block_template->blockhashing_blob = strdup( + json_object_get_string(blockhashing_blob)); + block_template->blocktemplate_blob = strdup( + json_object_get_string(blocktemplate_blob)); + block_template->difficulty = json_object_get_int64(difficulty); + block_template->height = json_object_get_int64(height); + memcpy(block_template->prev_hash, json_object_get_string(prev_hash), 64); + block_template->reserved_offset = json_object_get_int(reserved_offset); +} + +static void +response_to_block(json_object *block_header, block_t *block) +{ + memset(block, 0, sizeof(block_t)); + JSON_GET_OR_WARN(height, block_header, json_type_int); + JSON_GET_OR_WARN(difficulty, block_header, json_type_int); + JSON_GET_OR_WARN(hash, block_header, json_type_string); + JSON_GET_OR_WARN(prev_hash, block_header, json_type_string); + JSON_GET_OR_WARN(timestamp, block_header, json_type_int); + JSON_GET_OR_WARN(reward, block_header, json_type_int); + JSON_GET_OR_WARN(orphan_status, block_header, json_type_boolean); + block->height = json_object_get_int64(height); + block->difficulty = json_object_get_int64(difficulty); + memcpy(block->hash, json_object_get_string(hash), 64); + memcpy(block->prev_hash, json_object_get_string(prev_hash), 64); + block->timestamp = json_object_get_int64(timestamp); + block->reward = json_object_get_int64(reward); + if (json_object_get_int(orphan_status)) + block->status |= BLOCK_ORPHANED; +} + +static void +rpc_on_response(struct evhttp_request *req, void *arg) +{ + struct evbuffer *input; + rpc_callback_t *callback = (rpc_callback_t*) arg; + + if (!req) + { + log_error("Request failure. Aborting."); + rpc_callback_free(callback); + return; + } + + int rc = evhttp_request_get_response_code(req); + if (rc < 200 || rc >= 300) + { + log_error("HTTP status code %d for %s. Aborting.", + rc, evhttp_request_get_uri(req)); + rpc_callback_free(callback); + return; + } + + input = evhttp_request_get_input_buffer(req); + size_t len = evbuffer_get_length(input); + char body[len+1]; + evbuffer_remove(input, body, len); + body[len] = '\0'; + callback->f(body, callback); + rpc_callback_free(callback); +} + +static void +rpc_request(struct event_base *base, const char *body, rpc_callback_t *callback) +{ + struct evhttp_connection *con; + struct evhttp_request *req; + struct evkeyvalq *headers; + struct evbuffer *output; + + con = evhttp_connection_base_new(base, NULL, + config.rpc_host, config.rpc_port); + evhttp_connection_free_on_completion(con); + evhttp_connection_set_timeout(con, config.rpc_timeout); + req = evhttp_request_new(rpc_on_response, callback); + output = evhttp_request_get_output_buffer(req); + evbuffer_add(output, body, strlen(body)); + headers = evhttp_request_get_output_headers(req); + evhttp_add_header(headers, "Content-Type", "application/json"); + evhttp_add_header(headers, "Connection", "close"); + evhttp_make_request(con, req, EVHTTP_REQ_POST, RPC_PATH); +} + +static void +rpc_wallet_request(struct event_base *base, const char *body, + rpc_callback_t *callback) +{ + struct evhttp_connection *con; + struct evhttp_request *req; + struct evkeyvalq *headers; + struct evbuffer *output; + + con = evhttp_connection_base_new(base, NULL, + config.wallet_rpc_host, config.wallet_rpc_port); + evhttp_connection_free_on_completion(con); + evhttp_connection_set_timeout(con, config.rpc_timeout); + req = evhttp_request_new(rpc_on_response, callback); + output = evhttp_request_get_output_buffer(req); + evbuffer_add(output, body, strlen(body)); + headers = evhttp_request_get_output_headers(req); + evhttp_add_header(headers, "Content-Type", "application/json"); + evhttp_add_header(headers, "Connection", "close"); + evhttp_make_request(con, req, EVHTTP_REQ_POST, RPC_PATH); +} + +static void +rpc_get_request_body(char *body, const char* method, char* fmt, ...) +{ + char *pb = body; + + snprintf(pb, RPC_BODY_MAX, "%s%s%s", "{\"jsonrpc\":\"2.0\",\"id\":\"0\"," + "\"method\":\"", method, "\""); + pb += strlen(pb); + + if (fmt && *fmt) + { + char *s; + uint64_t d; + snprintf(pb, RPC_BODY_MAX - strlen(body), "%s", ",\"params\":{"); + pb += strlen(pb); + va_list args; + va_start(args, fmt); + uint8_t count = 0; + while (*fmt) + { + switch (*fmt++) + { + case 's': + s = va_arg(args, char *); + snprintf(pb, RPC_BODY_MAX - strlen(body), "\"%s\"", s); + pb += strlen(pb); + break; + case 'd': + d = va_arg(args, uint64_t); + snprintf(pb, RPC_BODY_MAX - strlen(body), "%"PRIu64, d); + pb += strlen(pb); + break; + } + *pb++ = count++ % 2 ? ',' : ':'; + } + va_end(args); + *--pb = '}'; + pb++; + } + *pb++ = '}'; + *pb = '\0'; + log_trace("Payload: %s", body); +} + +static void +rpc_on_block_header_by_height(const char* data, rpc_callback_t *callback) +{ + log_trace("Got block header by height: \n%s", data); + json_object *root = json_tokener_parse(data); + JSON_GET_OR_WARN(result, root, json_type_object); + JSON_GET_OR_WARN(status, result, json_type_string); + const char *ss = json_object_get_string(status); + json_object *error = NULL; + json_object_object_get_ex(root, "error", &error); + if (error != NULL) + { + JSON_GET_OR_WARN(code, error, json_type_object); + JSON_GET_OR_WARN(message, error, json_type_string); + int ec = json_object_get_int(code); + const char *em = json_object_get_string(message); + log_error("Error (%d) getting block header by height: %s", ec, em); + json_object_put(root); + return; + } + if (status == NULL || strcmp(ss, "OK") != 0) + { + log_error("Error getting block header by height: %s", ss); + json_object_put(root); + return; + } + block_t rb; + JSON_GET_OR_WARN(block_header, result, json_type_object); + response_to_block(block_header, &rb); + process_blocks(&rb, 1); + json_object_put(root); +} + +static void +rpc_on_block_headers_range(const char* data, rpc_callback_t *callback) +{ + json_object *root = json_tokener_parse(data); + JSON_GET_OR_WARN(result, root, json_type_object); + JSON_GET_OR_WARN(status, result, json_type_string); + const char *ss = json_object_get_string(status); + json_object *error = NULL; + json_object_object_get_ex(root, "error", &error); + if (error != NULL) + { + JSON_GET_OR_WARN(code, error, json_type_object); + JSON_GET_OR_WARN(message, error, json_type_string); + int ec = json_object_get_int(code); + const char *em = json_object_get_string(message); + log_warn("Error (%d) getting block headers by range: %s", ec, em); + json_object_put(root); + return; + } + if (status == NULL || strcmp(ss, "OK") != 0) + { + log_warn("Error getting block headers by range: %s", ss); + json_object_put(root); + return; + } + + JSON_GET_OR_WARN(headers, result, json_type_array); + size_t headers_len = json_object_array_length(headers); + assert(headers_len == BLOCK_HEADERS_RANGE); + for (int i=0; iheight); + rpc_get_request_body(body, "get_block_header_by_height", "sd", + "height", block->height); rpc_callback_t *cb = rpc_callback_new( rpc_on_block_header_by_height, NULL); rpc_request(base, body, cb); @@ -945,451 +1416,6 @@ startup_pauout(uint64_t height) return 0; } -static void -update_pool_hr() -{ - uint64_t hr = 0; - client_t *c = pool_clients.clients; - for (size_t i = 0; i < pool_clients.count; i++, c++) - { - if (c->connected_since != 0) - { - double d = difftime(time(NULL), c->connected_since); - if (d == 0.0) - continue; - hr += c->hashes / d; - } - } - pool_stats.pool_hashrate = hr; -} - -static void -template_recycle(void *item) -{ - block_template_t *bt = (block_template_t*) item; - log_trace("Recycle block template at height: %"PRIu64, bt->height); - if (bt->blockhashing_blob) - { - free(bt->blockhashing_blob); - bt->blockhashing_blob = NULL; - } - if (bt->blocktemplate_blob) - { - free(bt->blocktemplate_blob); - bt->blocktemplate_blob = NULL; - } -} - -static void -pool_clients_init() -{ - assert(pool_clients.count == 0); - pool_clients.count = POOL_CLIENTS_GROW; - pool_clients.clients = (client_t*) calloc(pool_clients.count, sizeof(client_t)); -} - -static void -pool_clients_free() -{ - assert(pool_clients.count != 0); - client_t *c = pool_clients.clients; - for (size_t i = 0; i < pool_clients.count; i++, c++) - { - client_clear_jobs(c); - } - free(pool_clients.clients); -} - -static job_t * -client_find_job(client_t *client, const char *job_id) -{ - uuid_t jid; - hex_to_bin(job_id, (char*)&jid, sizeof(uuid_t)); - for (size_t i=0; iactive_jobs[i]; - if (memcmp(job->id, jid, sizeof(uuid_t)) == 0) - return job; - } - return NULL; -} - -static void -pool_clients_send_job() -{ - client_t *c = pool_clients.clients; - for (size_t i = 0; i < pool_clients.count; i++, c++) - { - if (c->fd == 0) - continue; - client_send_job(c, false); - } -} - -static void -target_to_hex(uint64_t target, char *target_hex) -{ - if (target & 0xFFFFFFFF00000000) - { - log_debug("High target requested: %"PRIu64, target); - bin_to_hex((const char*)&target, 8, &target_hex[0], 16); - target_hex[16] = '\0'; - return; - } - BIGNUM *diff = BN_new(); - BIGNUM *bnt = BN_new(); -#ifdef SIXTY_FOUR_BIT_LONG - BN_set_word(bnt, target); -#else - char st[24]; - snprintf(st, 24, "%"PRIu64, target); - BN_dec2bn(&bnt, st); -#endif - BN_div(diff, NULL, base_diff, bnt, bn_ctx); - BN_rshift(diff, diff, 224); - uint32_t w = BN_get_word(diff); - bin_to_hex((const char*)&w, 4, &target_hex[0], 8); - target_hex[8] = '\0'; - BN_free(bnt); - BN_free(diff); -} - -static void -stratum_get_proxy_job_body(char *body, const client_t *client, const char *block_hex, bool response) -{ - int json_id = client->json_id; - const char *client_id = client->client_id; - const job_t *job = &client->active_jobs[0]; - char job_id[33]; - bin_to_hex((const char*)job->id, sizeof(uuid_t), job_id, 32); - uint64_t target = job->target; - char target_hex[17]; - target_to_hex(target, &target_hex[0]); - const block_template_t *bt = job->block_template; - - if (response) - { - snprintf(body, JOB_BODY_MAX, "{\"id\":%d,\"jsonrpc\":\"2.0\",\"error\":null,\"result\"" - ":{\"id\":\"%.32s\",\"job\":{\"blocktemplate_blob\":\"%s\",\"job_id\":\"%.32s\"," - "\"difficulty\":%"PRIu64",\"height\":%"PRIu64",\"reserved_offset\":%u,\"client_nonce_offset\":%u," - "\"client_pool_offset\":%u,\"target_diff\":%"PRIu64",\"target_diff_hex\":\"%s\"}," - "\"status\":\"OK\"}}\n", json_id, client_id, block_hex, job_id, - bt->difficulty, bt->height, bt->reserved_offset, bt->reserved_offset + 12, - bt->reserved_offset + 8, target, target_hex); - } - else - { - snprintf(body, JOB_BODY_MAX, "{\"jsonrpc\":\"2.0\",\"method\":\"job\",\"params\"" - ":{\"id\":\"%.32s\",\"job\":{\"blocktemplate_blob\":\"%s\",\"job_id\":\"%.32s\"," - "\"difficulty\":%"PRIu64",\"height\":%"PRIu64",\"reserved_offset\":%u,\"client_nonce_offset\":%u," - "\"client_pool_offset\":%u,\"target_diff\":%"PRIu64",\"target_diff_hex\":\"%s\"}," - "\"status\":\"OK\"}}\n", client_id, block_hex, job_id, - bt->difficulty, bt->height, bt->reserved_offset, bt->reserved_offset + 12, - bt->reserved_offset + 8, target, target_hex); - } -} - -static void -stratum_get_job_body(char *body, const client_t *client, bool response) -{ - int json_id = client->json_id; - const char *client_id = client->client_id; - const job_t *job = &client->active_jobs[0]; - char job_id[33]; - bin_to_hex((const char*)job->id, sizeof(uuid_t), job_id, 32); - const char *blob = job->blob; - uint64_t target = job->target; - uint64_t height = job->block_template->height; - char target_hex[17]; - target_to_hex(target, &target_hex[0]); - - if (response) - { - snprintf(body, JOB_BODY_MAX, "{\"id\":%d,\"jsonrpc\":\"2.0\",\"error\":null,\"result\"" - ":{\"id\":\"%.32s\",\"job\":{" - "\"blob\":\"%s\",\"job_id\":\"%.32s\",\"target\":\"%s\",\"height\":%"PRIu64"}," - "\"status\":\"OK\"}}\n", json_id, client_id, blob, job_id, target_hex, height); - } - else - { - snprintf(body, JOB_BODY_MAX, "{\"jsonrpc\":\"2.0\",\"method\":\"job\",\"params\"" - ":{\"id\":\"%.32s\",\"blob\":\"%s\",\"job_id\":\"%.32s\",\"target\":\"%s\"," - "\"height\":%"PRIu64"}}\n", - client_id, blob, job_id, target_hex, height); - } -} - -static inline void -stratum_get_error_body(char *body, int json_id, const char *error) -{ - snprintf(body, ERROR_BODY_MAX, "{\"id\":%d,\"jsonrpc\":\"2.0\",\"error\":" - "{\"code\":-1, \"message\":\"%s\"}}\n", json_id, error); -} - -static inline void -stratum_get_status_body(char *body, int json_id, const char *status) -{ - snprintf(body, STATUS_BODY_MAX, "{\"id\":%d,\"jsonrpc\":\"2.0\",\"error\":null,\"result\":{\"status\":\"%s\"}}\n", - json_id, status); -} - -static void -response_to_block_template(json_object *result, block_template_t *block_template) -{ - JSON_GET_OR_WARN(blockhashing_blob, result, json_type_string); - JSON_GET_OR_WARN(blocktemplate_blob, result, json_type_string); - JSON_GET_OR_WARN(difficulty, result, json_type_int); - JSON_GET_OR_WARN(height, result, json_type_int); - JSON_GET_OR_WARN(prev_hash, result, json_type_string); - JSON_GET_OR_WARN(reserved_offset, result, json_type_int); - block_template->blockhashing_blob = strdup(json_object_get_string(blockhashing_blob)); - block_template->blocktemplate_blob = strdup(json_object_get_string(blocktemplate_blob)); - block_template->difficulty = json_object_get_int64(difficulty); - block_template->height = json_object_get_int64(height); - memcpy(block_template->prev_hash, json_object_get_string(prev_hash), 64); - block_template->reserved_offset = json_object_get_int(reserved_offset); -} - -static void -response_to_block(json_object *block_header, block_t *block) -{ - memset(block, 0, sizeof(block_t)); - JSON_GET_OR_WARN(height, block_header, json_type_int); - JSON_GET_OR_WARN(difficulty, block_header, json_type_int); - JSON_GET_OR_WARN(hash, block_header, json_type_string); - JSON_GET_OR_WARN(prev_hash, block_header, json_type_string); - JSON_GET_OR_WARN(timestamp, block_header, json_type_int); - JSON_GET_OR_WARN(reward, block_header, json_type_int); - JSON_GET_OR_WARN(orphan_status, block_header, json_type_boolean); - block->height = json_object_get_int64(height); - block->difficulty = json_object_get_int64(difficulty); - memcpy(block->hash, json_object_get_string(hash), 64); - memcpy(block->prev_hash, json_object_get_string(prev_hash), 64); - block->timestamp = json_object_get_int64(timestamp); - block->reward = json_object_get_int64(reward); - if (json_object_get_int(orphan_status)) - block->status |= BLOCK_ORPHANED; -} - -static void -rpc_get_request_body(char *body, const char* method, char* fmt, ...) -{ - char *pb = body; - - snprintf(pb, RPC_BODY_MAX, "%s%s%s", "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"", method, "\""); - pb += strlen(pb); - - if (fmt && *fmt) - { - char *s; - uint64_t d; - snprintf(pb, RPC_BODY_MAX - strlen(body), "%s", ",\"params\":{"); - pb += strlen(pb); - va_list args; - va_start(args, fmt); - uint8_t count = 0; - while (*fmt) - { - switch (*fmt++) - { - case 's': - s = va_arg(args, char *); - snprintf(pb, RPC_BODY_MAX - strlen(body), "\"%s\"", s); - pb += strlen(pb); - break; - case 'd': - d = va_arg(args, uint64_t); - snprintf(pb, RPC_BODY_MAX - strlen(body), "%"PRIu64, d); - pb += strlen(pb); - break; - } - *pb++ = count++ % 2 ? ',' : ':'; - } - va_end(args); - *--pb = '}'; - pb++; - } - *pb++ = '}'; - *pb = '\0'; - log_trace("Payload: %s", body); -} - -static void -rpc_on_response(struct evhttp_request *req, void *arg) -{ - struct evbuffer *input; - rpc_callback_t *callback = (rpc_callback_t*) arg; - - if (!req) - { - log_error("Request failure. Aborting."); - rpc_callback_free(callback); - return; - } - - int rc = evhttp_request_get_response_code(req); - if (rc < 200 || rc >= 300) - { - log_error("HTTP status code %d for %s. Aborting.", - rc, evhttp_request_get_uri(req)); - rpc_callback_free(callback); - return; - } - - input = evhttp_request_get_input_buffer(req); - size_t len = evbuffer_get_length(input); - char body[len+1]; - evbuffer_remove(input, body, len); - body[len] = '\0'; - callback->f(body, callback); - rpc_callback_free(callback); -} - -static void -rpc_request(struct event_base *base, const char *body, rpc_callback_t *callback) -{ - struct evhttp_connection *con; - struct evhttp_request *req; - struct evkeyvalq *headers; - struct evbuffer *output; - - con = evhttp_connection_base_new(base, NULL, config.rpc_host, config.rpc_port); - evhttp_connection_free_on_completion(con); - evhttp_connection_set_timeout(con, config.rpc_timeout); - req = evhttp_request_new(rpc_on_response, callback); - output = evhttp_request_get_output_buffer(req); - evbuffer_add(output, body, strlen(body)); - headers = evhttp_request_get_output_headers(req); - evhttp_add_header(headers, "Content-Type", "application/json"); - evhttp_add_header(headers, "Connection", "close"); - evhttp_make_request(con, req, EVHTTP_REQ_POST, RPC_PATH); -} - -static void -rpc_wallet_request(struct event_base *base, const char *body, rpc_callback_t *callback) -{ - struct evhttp_connection *con; - struct evhttp_request *req; - struct evkeyvalq *headers; - struct evbuffer *output; - - con = evhttp_connection_base_new(base, NULL, config.wallet_rpc_host, config.wallet_rpc_port); - evhttp_connection_free_on_completion(con); - evhttp_connection_set_timeout(con, config.rpc_timeout); - req = evhttp_request_new(rpc_on_response, callback); - output = evhttp_request_get_output_buffer(req); - evbuffer_add(output, body, strlen(body)); - headers = evhttp_request_get_output_headers(req); - evhttp_add_header(headers, "Content-Type", "application/json"); - evhttp_add_header(headers, "Connection", "close"); - evhttp_make_request(con, req, EVHTTP_REQ_POST, RPC_PATH); -} - -static void -rpc_on_block_headers_range(const char* data, rpc_callback_t *callback) -{ - json_object *root = json_tokener_parse(data); - JSON_GET_OR_WARN(result, root, json_type_object); - JSON_GET_OR_WARN(status, result, json_type_string); - const char *ss = json_object_get_string(status); - json_object *error = NULL; - json_object_object_get_ex(root, "error", &error); - if (error != NULL) - { - JSON_GET_OR_WARN(code, error, json_type_object); - JSON_GET_OR_WARN(message, error, json_type_string); - int ec = json_object_get_int(code); - const char *em = json_object_get_string(message); - log_warn("Error (%d) getting block headers by range: %s", ec, em); - json_object_put(root); - return; - } - if (status == NULL || strcmp(ss, "OK") != 0) - { - log_warn("Error getting block headers by range: %s", ss); - json_object_put(root); - return; - } - - JSON_GET_OR_WARN(headers, result, json_type_array); - size_t headers_len = json_object_array_length(headers); - assert(headers_len == BLOCK_HEADERS_RANGE); - for (int i=0; iheight - 60; uint64_t start = end - BLOCK_HEADERS_RANGE + 1; - rpc_get_request_body(body, "get_block_headers_range", "sdsd", "start_height", start, "end_height", end); + rpc_get_request_body(body, "get_block_headers_range", "sdsd", + "start_height", start, "end_height", end); rpc_callback_t *cb2 = rpc_callback_new( rpc_on_block_headers_range, NULL); rpc_request(base, body, cb2); @@ -1609,6 +1637,104 @@ cleanup: json_object_put(root); } +static int +send_payments() +{ + uint64_t threshold = 1000000000000 * config.payment_threshold; + int rc; + char *err; + MDB_txn *txn; + MDB_cursor *cursor; + if ((rc = mdb_txn_begin(env, NULL, MDB_RDONLY, &txn)) != 0) + { + err = mdb_strerror(rc); + log_error("%s", err); + return rc; + } + if ((rc = mdb_cursor_open(txn, db_balance, &cursor)) != 0) + { + err = mdb_strerror(rc); + log_error("%s", err); + mdb_txn_abort(txn); + return rc; + } + + size_t payments_count = 0; + size_t payments_max_count = 25; + size_t payments_size = payments_max_count * sizeof(payment_t); + payment_t *payments = (payment_t*) calloc(1, payments_size); + memset(payments, 0, payments_size); + payment_t *payment = payments; + payment_t *end_payment = payment + payments_max_count; + + MDB_cursor_op op = MDB_FIRST; + while (1) + { + MDB_val key; + MDB_val val; + rc = mdb_cursor_get(cursor, &key, &val, op); + op = MDB_NEXT; + if (rc != 0) + break; + + const char *address = (const char*)key.mv_data; + uint64_t amount = *(uint64_t*)val.mv_data; + + if (amount < threshold) + continue; + + log_info("Sending payment of %"PRIu64" to %s\n", amount, address); + + memcpy(payment->address, address, ADDRESS_MAX); + payment->amount = amount; + payments_count++; + + if (++payment == end_payment) + { + payments_size <<= 1; + payments = (payment_t*) realloc(payments, payments_size); + payment = payments + payments_max_count; + memset(payment, 0, sizeof(payment_t) * payments_max_count); + payments_max_count <<= 1; + end_payment = payments + payments_max_count; + } + } + mdb_cursor_close(cursor); + mdb_txn_abort(txn); + + if (payments_count) + { + size_t body_size = 160 * payments_count + 128; + char body[body_size]; + char *start = body; + char *end = body + body_size; + start = stecpy(start, "{\"id\":\"0\",\"jsonrpc\":\"2.0\",\"method\":" + "\"transfer_split\",\"params\":{" + "\"ring_size\":11,\"destinations\":[", end); + for (size_t i=0; iaddress, end); + start = stecpy(start, "\",\"amount\":", end); + sprintf(start, "%"PRIu64"}", p->amount); + start = body + strlen(body); + if (i != payments_count -1) + start = stecpy(start, ",", end); + else + start = stecpy(start, "]}}", end); + } + log_trace(body); + rpc_callback_t *cb = rpc_callback_new( + rpc_on_wallet_transferred, payments); + rpc_wallet_request(base, body, cb); + } + else + free(payments); + + return 0; +} + static void fetch_last_block_header() { @@ -1654,7 +1780,8 @@ client_add(int fd, struct bufferevent *bev) { pthread_mutex_lock(&mutex_clients); pool_clients.count += POOL_CLIENTS_GROW; - c = realloc(pool_clients.clients, sizeof(client_t) * pool_clients.count); + c = realloc(pool_clients.clients, sizeof(client_t) * + pool_clients.count); pool_clients.clients = c; c += pool_clients.count - POOL_CLIENTS_GROW; pthread_mutex_unlock(&mutex_clients); @@ -1696,134 +1823,6 @@ client_clear(struct bufferevent *bev) pool_stats.connected_miners--; } -static void -client_send_job(client_t *client, bool response) -{ - /* First cycle jobs */ - job_t *last = &client->active_jobs[CLIENT_JOBS_MAX-1]; - if (last->blob != NULL) - { - free(last->blob); - last->blob = NULL; - } - if (last->submissions != NULL) - { - free(last->submissions); - last->submissions = NULL; - last->submissions_count = 0; - } - for (size_t i=CLIENT_JOBS_MAX-1; i>0; i--) - { - job_t *current = &client->active_jobs[i]; - job_t *prev = &client->active_jobs[i-1]; - memcpy(current, prev, sizeof(job_t)); - } - job_t *job = &client->active_jobs[0]; - memset(job, 0, sizeof(job_t)); - - /* Quick check we actually have a block template */ - block_template_t *bt = bstack_peek(bst); - if (!bt) - { - log_warn("Cannot send client a job as have not yet recieved a block template"); - return; - } - - /* - 1. Convert blocktemplate_blob to binary - 2. Update bytes in reserved space at reserved_offset - 3. Get block hashing blob for job - 4. Send - */ - - /* Convert template to blob */ - size_t bin_size = strlen(bt->blocktemplate_blob) >> 1; - char *block = calloc(bin_size, sizeof(char)); - hex_to_bin(bt->blocktemplate_blob, block, bin_size); - - /* Set the extra nonce in our reserved space */ - char *p = block; - p += bt->reserved_offset; - ++extra_nonce; - memcpy(p, &extra_nonce, sizeof(extra_nonce)); - job->extra_nonce = extra_nonce; - - /* Get hashong blob */ - size_t hashing_blob_size; - char *hashing_blob = NULL; - get_hashing_blob(block, bin_size, &hashing_blob, &hashing_blob_size); - - /* Make hex */ - job->blob = calloc((hashing_blob_size << 1) +1, sizeof(char)); - bin_to_hex(hashing_blob, hashing_blob_size, job->blob, hashing_blob_size << 1); - log_trace("Miner hashing blob: %s", job->blob); - - /* Save a job id */ - uuid_generate(job->id); - - /* Hold reference to block template */ - job->block_template = bt; - - /* Send */ - char job_id[33]; - bin_to_hex((const char*)job->id, sizeof(uuid_t), job_id, 32); - - /* Retarget */ - double duration = difftime(time(NULL), client->connected_since); - uint8_t retarget_time = client->is_proxy ? 5 : 120; - uint64_t target = fmax((double)client->hashes / duration * retarget_time, config.pool_start_diff); - job->target = target; - log_debug("Client %.32s target now %"PRIu64, client->client_id, target); - - char body[JOB_BODY_MAX]; - if (!client->is_proxy) - { - stratum_get_job_body(body, client, response); - } - else - { - char *block_hex = calloc(bin_size+1, sizeof(char)); - bin_to_hex(block, bin_size, block_hex, bin_size << 1); - stratum_get_proxy_job_body(body, client, block_hex, response); - free(block_hex); - } - log_trace("Client job: %s", body); - struct evbuffer *output = bufferevent_get_output(client->bev); - evbuffer_add(output, body, strlen(body)); - free(block); - free(hashing_blob); -} - -static void -client_clear_jobs(client_t *client) -{ - for (size_t i=0; iactive_jobs[i]; - if (job->blob != NULL) - { - free(job->blob); - job->blob = NULL; - } - if (job->submissions != NULL) - { - free(job->submissions); - job->submissions = NULL; - job->submissions_count = 0; - } - } -} - -static void -send_validation_error(const client_t *client, const char *message) -{ - struct evbuffer *output = bufferevent_get_output(client->bev); - char body[ERROR_BODY_MAX]; - stratum_get_error_body(body, client->json_id, message); - evbuffer_add(output, body, strlen(body)); - log_debug("Validation error: %s", message); -} - static void client_on_login(json_object *message, client_t *client) { @@ -1835,7 +1834,8 @@ client_on_login(json_object *message, client_t *client) uint64_t prefix; parse_address(address, &prefix); if (prefix != MAINNET_ADDRESS_PREFIX && prefix != TESTNET_ADDRESS_PREFIX) - return send_validation_error(client, "login only main wallet addresses are supported"); + return send_validation_error(client, + "login only main wallet addresses are supported"); const char *worker_id = json_object_get_string(pass); @@ -1846,7 +1846,8 @@ client_on_login(json_object *message, client_t *client) if (user_agent) { strncpy(client->agent, user_agent, 255); - client->is_proxy = strstr(user_agent, "proxy") != NULL ? true : false; + client->is_proxy = strstr(user_agent, "proxy") != NULL + ? true : false; } } @@ -1892,7 +1893,8 @@ client_on_submit(json_object *message, client_t *client) if (!job) return send_validation_error(client, "cannot find job with job_id"); - log_trace("Client submitted nonce=%u, result=%s", result_nonce, result_hex); + log_trace("Client submitted nonce=%u, result=%s", + result_nonce, result_hex); /* 1. Validate submission active_job->blocktemplate_blob to bin @@ -1904,8 +1906,8 @@ client_on_submit(json_object *message, client_t *client) check result hash against block difficulty (if ge then mined block) check result hash against target difficulty (if not ge, invalid share) 2. Process share - check result hash against template difficulty (submit to network if good) - add share to db + check result hash against template difficulty + (submit to network if good) add share to db Note reserved space is: extra_nonce, instance_id, pool_nonce, worker_nonce 4 bytes each. instance_id would be used for pool threads. @@ -1948,7 +1950,8 @@ client_on_submit(json_object *message, client_t *client) *psub++ = worker_nonce; psub -= 4; - log_trace("Submission reserved values: %u %u %u %u", *psub, *(psub+1), *(psub+2), *(psub+3)); + log_trace("Submission reserved values: %u %u %u %u", + *psub, *(psub+1), *(psub+2), *(psub+3)); /* Check not already submitted */ uint128_t *submissions = job->submissions; @@ -1976,7 +1979,8 @@ client_on_submit(json_object *message, client_t *client) /* Get hashong blob */ size_t hashing_blob_size; char *hashing_blob = NULL; - if (get_hashing_blob(block, bin_size, &hashing_blob, &hashing_blob_size) != 0) + if (get_hashing_blob(block, bin_size, + &hashing_blob, &hashing_blob_size) != 0) { char body[ERROR_BODY_MAX]; stratum_get_error_body(body, client->json_id, "Invalid block"); @@ -1991,7 +1995,8 @@ client_on_submit(json_object *message, client_t *client) char submitted_hash[32]; uint8_t major_version = (uint8_t)block[0]; const int cn_variant = major_version >= 7 ? major_version - 6 : 0; - get_hash(hashing_blob, hashing_blob_size, (char**)&result_hash, cn_variant, bt->height); + get_hash(hashing_blob, hashing_blob_size, + (char**)&result_hash, cn_variant, bt->height); hex_to_bin(result_hex, submitted_hash, 32); if (memcmp(submitted_hash, result_hash, 32) != 0) @@ -2020,7 +2025,8 @@ client_on_submit(json_object *message, client_t *client) /* Process share */ client->hashes += job->target; time_t now = time(NULL); - log_trace("Checking hash against blobk difficulty: %lu, job difficulty: %lu", + log_trace("Checking hash against blobk difficulty: " + "%lu, job difficulty: %lu", BN_get_word(bd), BN_get_word(jd)); bool can_store = false; @@ -2033,13 +2039,14 @@ client_on_submit(json_object *message, client_t *client) bin_to_hex(block, bin_size, block_hex, bin_size << 1); char body[RPC_BODY_MAX]; snprintf(body, RPC_BODY_MAX, - "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"submit_block\", \"params\":[\"%s\"]}", + "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":" + "\"submit_block\", \"params\":[\"%s\"]}", block_hex); rpc_callback_t *cb = rpc_callback_new(rpc_on_block_submitted, NULL); cb->data = calloc(1, sizeof(block_t)); - block_t* b = (block_t*)cb->data; + block_t* b = (block_t*) cb->data; b->height = bt->height; bin_to_hex(submitted_hash, 32, b->hash, 64); memcpy(b->prev_hash, bt->prev_hash, 64); @@ -2247,7 +2254,8 @@ read_config(const char *config_file, const char *log_file, bool block_notified) strcat(path, "/pool.conf"); if (access(path, R_OK) != 0) { - log_fatal("Cannot find a config file in ./ or ~/ and no option supplied. Aborting."); + log_fatal("Cannot find a config file in ./ or ~/ " + "and no option supplied. Aborting."); abort(); } } @@ -2346,18 +2354,37 @@ read_config(const char *config_file, const char *log_file, bool block_notified) } if (!config.wallet_rpc_host[0] || config.wallet_rpc_port == 0) { - log_fatal("Both wallet-rpc-host and wallet-rpc-port need setting. Aborting."); + log_fatal("Both wallet-rpc-host and wallet-rpc-port need setting. " + "Aborting."); abort(); } - log_info("\nCONFIG:\n rpc_host = %s\n rpc_port = %u\n rpc_timeout = %u\n pool_wallet = %s\n " - "pool_start_diff = %"PRIu64"\n share_mul = %.2f\n pool_fee = %.2f\n payment_threshold = %.2f\n " + log_info("\nCONFIG:\n rpc_host = %s\n rpc_port = %u\n " + "rpc_timeout = %u\n pool_wallet = %s\n " + "pool_start_diff = %"PRIu64"\n share_mul = %.2f\n " + "pool_fee = %.2f\n payment_threshold = %.2f\n " "wallet_rpc_host = %s\n wallet_rpc_port = %u\n pool_port = %u\n " - "log_level = %u\n webui_port=%u\n log-file = %s\n block-notified = %u\n", + "log_level = %u\n webui_port=%u\n " + "log-file = %s\n block-notified = %u\n", config.rpc_host, config.rpc_port, config.rpc_timeout, config.pool_wallet, config.pool_start_diff, config.share_mul, config.pool_fee, config.payment_threshold, config.wallet_rpc_host, config.wallet_rpc_port, config.pool_port, - config.log_level, config.webui_port, config.log_file, config.block_notified); + config.log_level, config.webui_port, + config.log_file, config.block_notified); +} + +static void +sigusr1_handler(evutil_socket_t fd, short event, void *arg) +{ + log_trace("Fetching last block header from signal"); + fetch_last_block_header(); +} + +static void +sigint_handler(int sig) +{ + signal(SIGINT, SIG_DFL); + exit(0); } static void @@ -2399,7 +2426,8 @@ run(void) return; } - listener_event = event_new(base, listener, EV_READ|EV_PERSIST, client_on_accept, (void*)base); + listener_event = event_new(base, listener, EV_READ|EV_PERSIST, + client_on_accept, (void*)base); if (event_add(listener_event, NULL) != 0) { log_fatal("Failed to add socket listener event"); @@ -2448,20 +2476,6 @@ cleanup() fclose(fd_log); } -static void -sigusr1_handler(evutil_socket_t fd, short event, void *arg) -{ - log_trace("Fetching last block header from signal"); - fetch_last_block_header(); -} - -static void -sigint_handler(int sig) -{ - signal(SIGINT, SIG_DFL); - exit(0); -} - int main(int argc, char **argv) { setvbuf(stdout, NULL, _IONBF, 0); @@ -2535,7 +2549,8 @@ int main(int argc, char **argv) bn_ctx = BN_CTX_new(); base_diff = NULL; - BN_hex2bn(&base_diff, "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"); + BN_hex2bn(&base_diff, + "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"); pool_clients_init(); diff --git a/src/webui.c b/src/webui.c index 87deefa..dc642cc 100644 --- a/src/webui.c +++ b/src/webui.c @@ -75,7 +75,8 @@ send_json_stats (void *cls, struct MHD_Connection *connection) uint32_t pbf = context->pool_stats->pool_blocks_found; uint64_t mh = 0; double mb = 0.0; - const char *wa = MHD_lookup_connection_value(connection, MHD_COOKIE_KIND, "wa"); + const char *wa = MHD_lookup_connection_value(connection, + MHD_COOKIE_KIND, "wa"); if (wa != NULL) { mh = miner_hr(wa); @@ -127,7 +128,8 @@ int start_web_ui(wui_context_t *context) { log_debug("Starting Web UI"); - mhd_daemon = MHD_start_daemon(MHD_USE_SELECT_INTERNALLY, context->port, NULL, NULL, + mhd_daemon = MHD_start_daemon(MHD_USE_SELECT_INTERNALLY, + context->port, NULL, NULL, &answer_to_connection, (void*) context, MHD_OPTION_END); return mhd_daemon != NULL ? 0 : -1; } diff --git a/src/xmr.cpp b/src/xmr.cpp index 8499600..6c5b4e7 100644 --- a/src/xmr.cpp +++ b/src/xmr.cpp @@ -50,7 +50,8 @@ using namespace cryptonote; -int get_hashing_blob(const char *input, const size_t in_size, char **output, size_t *out_size) +int get_hashing_blob(const char *input, const size_t in_size, + char **output, size_t *out_size) { block b = AUTO_VAL_INIT(b); blobdata bd = std::string(input, in_size); @@ -79,8 +80,10 @@ int parse_address(const char *input, uint64_t *prefix) return rv ? 0 : -1; } -void get_hash(const char *input, const size_t in_size, char **output, int variant, uint64_t height) +void get_hash(const char *input, const size_t in_size, + char **output, int variant, uint64_t height) { - crypto::cn_slow_hash(input, in_size, reinterpret_cast(*output), variant, height); + crypto::cn_slow_hash(input, in_size, + reinterpret_cast(*output), variant, height); } diff --git a/src/xmr.h b/src/xmr.h index 79d04d6..3b13bfc 100644 --- a/src/xmr.h +++ b/src/xmr.h @@ -37,9 +37,11 @@ extern "C" { #endif -int get_hashing_blob(const char *input, const size_t in_size, char **output, size_t *out_size); +int get_hashing_blob(const char *input, const size_t in_size, + char **output, size_t *out_size); int parse_address(const char *input, uint64_t *prefix); -void get_hash(const char *input, const size_t in_size, char **output, int variant, uint64_t height); +void get_hash(const char *input, const size_t in_size, + char **output, int variant, uint64_t height); #ifdef __cplusplus }