From 70f865798f16488caab0af09fec0408adf8b2ab0 Mon Sep 17 00:00:00 2001 From: Jethro Grassie Date: Wed, 2 Sep 2020 18:11:27 -0400 Subject: [PATCH] allow listen socket reuse plus log client IP --- src/pool.c | 79 +++++++++++++++++++++++++++++++++++++---------------- src/webui.c | 22 +++++++++++++-- 2 files changed, 76 insertions(+), 25 deletions(-) diff --git a/src/pool.c b/src/pool.c index 039318f..61ea3b8 100644 --- a/src/pool.c +++ b/src/pool.c @@ -211,6 +211,8 @@ typedef struct job_t typedef struct client_t { int fd; + char host[MAX_HOST]; + uint16_t port; int json_id; struct bufferevent *bev; char address[ADDRESS_MAX]; @@ -1271,7 +1273,8 @@ send_validation_error(const client_t *client, const char *message) char body[ERROR_BODY_MAX] = {0}; stratum_get_error_body(body, client->json_id, message); evbuffer_add(output, body, strlen(body)); - log_debug("Validation error: %s", message); + log_debug("[%s:%d] Validation error: %s", + client->host, client->port, message); } static void @@ -2491,12 +2494,14 @@ trusted_on_account_connect(client_t *client) evbuffer_remove(input, &count, sizeof(uint32_t)); pool_stats.connected_accounts += count; client->downstream_accounts += count; - log_trace("Downstream account connected. " - "Accounts: %d, Pool hashrate: %"PRIu64, - pool_stats.connected_accounts, pool_stats.pool_hashrate); + log_trace("Downstream account connected"); trusted_send_stats(client); if (upstream_event) upstream_send_account_connect(count); + log_trace("Pool accounts: %d, workers: %d, hashrate: %"PRIu64, + pool_stats.connected_accounts, + gbag_used(bag_clients), + pool_stats.pool_hashrate); } static void @@ -2505,12 +2510,14 @@ trusted_on_account_disconnect(client_t *client) pool_stats.connected_accounts--; if (client->downstream_accounts) client->downstream_accounts--; - log_trace("Downstream account disconnected. " - "Miner count: %d, Pool hashrate: %"PRIu64, - pool_stats.connected_accounts, pool_stats.pool_hashrate); + log_trace("Downstream account disconnected"); trusted_send_stats(client); if (upstream_event) upstream_send_account_disconnect(); + log_trace("Pool accounts: %d, workers: %d, hashrate: %"PRIu64, + pool_stats.connected_accounts, + gbag_used(bag_clients), + pool_stats.pool_hashrate); } static void @@ -2753,8 +2760,9 @@ timer_on_10m(int fd, short kind, void *ctx) evtimer_add(timer_10m, &timeout); } -static void -client_add(int fd, struct bufferevent *bev, bool downstream) +static const client_t * +client_add(int fd, struct sockaddr_storage *ss, + struct bufferevent *bev, bool downstream) { client_t *c; bool resize = gbag_used(bag_clients) == gbag_max(bag_clients); @@ -2774,10 +2782,23 @@ client_add(int fd, struct bufferevent *bev, bool downstream) c->bev = bev; c->connected_since = time(NULL); c->downstream = downstream; + int rc = 0; + if ((rc = getnameinfo((struct sockaddr*)ss, sizeof(ss), + c->host, MAX_HOST, NULL, 0, NI_NUMERICHOST))) + { + log_error("Error getting client address: %s", + gai_strerror(rc)); + } + else + { + struct sockaddr_in *sin = (struct sockaddr_in*) ss; + c->port = htons(sin->sin_port); + } bstack_new(&c->active_jobs, CLIENT_JOBS_MAX, sizeof(job_t), job_recycle); pthread_rwlock_wrlock(&rwlock_cfd); HASH_ADD_INT(clients_by_fd, fd, c); pthread_rwlock_unlock(&rwlock_cfd); + return c; } static void @@ -3146,7 +3167,7 @@ miner_on_submit(json_object *message, client_t *client) char body[ERROR_BODY_MAX] = {0}; stratum_get_error_body(body, client->json_id, "Duplicate share"); evbuffer_add(output, body, strlen(body)); - log_debug("Duplicate share"); + log_debug("[%s:%d] Duplicate share", client->host, client->port); free(block); return; } @@ -3351,7 +3372,7 @@ miner_on_read(struct bufferevent *bev, void *ctx) char body[ERROR_BODY_MAX] = {0}; stratum_get_error_body(body, client->json_id, too_long); evbuffer_add(output, body, strlen(body)); - log_info(too_long); + log_warn("[%s:%d] %s", client->host, client->port, too_long); evbuffer_drain(input, len); client_clear(bev); goto unlock; @@ -3366,7 +3387,7 @@ miner_on_read(struct bufferevent *bev, void *ctx) char body[ERROR_BODY_MAX] = {0}; stratum_get_error_body(body, client->json_id, invalid_json); evbuffer_add(output, body, strlen(body)); - log_info(invalid_json); + log_warn("[%s:%d] %s", client->host, client->port, invalid_json); evbuffer_drain(input, len); client_clear(bev); goto unlock; @@ -3417,7 +3438,7 @@ miner_on_read(struct bufferevent *bev, void *ctx) char body[ERROR_BODY_MAX] = {0}; stratum_get_error_body(body, client->json_id, unknown_method); evbuffer_add(output, body, strlen(body)); - log_info(unknown_method); + log_warn("[%s:%d] %s", client->host, client->port, unknown_method); evbuffer_drain(input, len); client_clear(bev); goto unlock; @@ -3427,7 +3448,7 @@ miner_on_read(struct bufferevent *bev, void *ctx) char body[ERROR_BODY_MAX] = {0}; stratum_get_error_body(body, client->json_id, too_bad); evbuffer_add(output, body, strlen(body)); - log_info(too_bad); + log_warn("[%s:%d] %s", client->host, client->port, too_bad); evbuffer_drain(input, len); client_clear(bev); goto unlock; @@ -3470,7 +3491,8 @@ trusted_on_read(struct bufferevent *bev, void *ctx) tag = evbuffer_search(input, (const char*) msgbin, 8, NULL); if (tag.pos < 0) { - log_error("Bad message from downstream"); + log_warn("[%s:%d] Bad message from downstream", + client->host, client->port); evbuffer_drain(input, len); client_clear(bev); goto unlock; @@ -3508,7 +3530,8 @@ trusted_on_read(struct bufferevent *bev, void *ctx) trusted_on_client_block(client); break; default: - log_error("Unknown message: %d", tnt[8]); + log_warn("[%s:%d] Unknown message: %d", + client->host, client->port, tnt[8]); evbuffer_drain(input, len); client_clear(bev); goto unlock; @@ -3525,18 +3548,23 @@ static void listener_on_error(struct bufferevent *bev, short error, void *ctx) { struct event_base *base = (struct event_base*)ctx; + client_t *client = NULL; + client_find(bev, &client); char *type = base != trusted_base ? "Miner" : "Downstream"; if (error & BEV_EVENT_EOF) { - log_debug("%s disconnected. Removing.", type); + log_debug("[%s:%d] %s disconnected. Removing.", + client->host, client->port, type); } else if (error & BEV_EVENT_ERROR) { - log_debug("%s error: %d. Removing.", type, errno); + log_debug("[%s:%d] %s error: %d. Removing.", + client->host, client->port, type, errno); } else if (error & BEV_EVENT_TIMEOUT) { - log_debug("%s timeout. Removing.", type); + log_debug("[%s:%d] %s timeout. Removing.", + client->host, client->port, type); } client_clear(bev); } @@ -3594,9 +3622,12 @@ listener_on_accept(evutil_socket_t listener, short event, void *arg) base == trusted_base ? trusted_on_read : miner_on_read, NULL, listener_on_error, arg); bufferevent_setwatermark(bev, EV_READ, 0, MAX_LINE); - client_add(fd, bev, base == trusted_base); - log_info("New %s connected. Miner count: %d, Pool hashrate: %"PRIu64, - type, pool_stats.connected_accounts, pool_stats.pool_hashrate); + const client_t *c = client_add(fd, &ss, bev, base == trusted_base); + log_info("New %s [%s:%d] connected", type, c->host, c->port); + log_info("Pool accounts: %d, workers: %d, hashrate: %"PRIu64, + pool_stats.connected_accounts, + gbag_used(bag_clients), + pool_stats.pool_hashrate); bufferevent_enable(bev, EV_READ|EV_WRITE); } @@ -4019,6 +4050,7 @@ trusted_run(void *ctx) { int one = 1; setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); + setsockopt(listener, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one)); } #endif @@ -4083,6 +4115,7 @@ run(void) { int one = 1; setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); + setsockopt(listener, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one)); } #endif @@ -4315,7 +4348,7 @@ int main(int argc, char **argv) { fd_log = fopen(config.log_file, "a"); if (!fd_log) - log_info("Failed to open log file: %s", config.log_file); + log_warn("Failed to open log file: %s", config.log_file); else log_set_fp(fd_log); } diff --git a/src/webui.c b/src/webui.c index a49348f..f7cb0b2 100644 --- a/src/webui.c +++ b/src/webui.c @@ -45,6 +45,7 @@ developers. #include #include #include +#include #include "log.h" #include "pool.h" @@ -148,8 +149,25 @@ static void * thread_main(void *ctx) { wui_context_t *context = (wui_context_t*) ctx; - webui_listener = evhttp_bind_socket_with_handle( - webui_httpd, context->pool_listen, context->port); + struct evconnlistener *lev = NULL; + struct addrinfo *info = NULL; + int rc; + char port[6] = {0}; + sprintf(port, "%d", context->port); + if ((rc = getaddrinfo(context->pool_listen, port, 0, &info))) + { + log_error("Error parsing listen address: %s", gai_strerror(rc)); + return 0; + } + lev = evconnlistener_new_bind(webui_base, 0, NULL, + LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE | LEV_OPT_REUSEABLE_PORT, + -1, (struct sockaddr*)info->ai_addr, info->ai_addrlen); + if (!lev) + { + log_error("%s", strerror(errno)); + return 0; + } + webui_listener = evhttp_bind_listener(webui_httpd, lev); if(!webui_listener) { log_error("Failed to bind for port: %u", context->port);