diff --git a/src/pool.c b/src/pool.c index 5bddec6..7665d20 100644 --- a/src/pool.c +++ b/src/pool.c @@ -2170,14 +2170,16 @@ upstream_send_ping() } static void -upstream_send_client_connect() +upstream_send_client_connect(uint32_t count) { struct evbuffer *output = bufferevent_get_output(upstream_event); - char data[9]; + size_t z = 9 + sizeof(uint32_t); + char data[z]; int t = BIN_CONNECT; memcpy(data, msgbin, 8); memcpy(data+8, &t, 1); - evbuffer_add(output, data, 9); + memcpy(data+9, &count, z-9); + evbuffer_add(output, data, z); log_trace("Sending message connect upstream"); } @@ -2322,19 +2324,23 @@ upstream_send_backlog() mdb_cursor_close(curshr); mdb_cursor_close(curblk); mdb_txn_abort(txn); + upstream_send_client_connect(pool_stats.connected_miners); } static void trusted_on_client_connect(client_t *client) { - pool_stats.connected_miners++; - client->downstream_miners++; + struct evbuffer *input = bufferevent_get_input(client->bev); + uint32_t count; + evbuffer_remove(input, &count, sizeof(uint32_t)); + pool_stats.connected_miners += count; + client->downstream_miners += count; log_trace("Downstream miner connected. " "Miner count: %d, Pool hashrate: %"PRIu64, pool_stats.connected_miners, pool_stats.pool_hashrate); trusted_send_stats(client); if (upstream_event) - upstream_send_client_connect(); + upstream_send_client_connect(count); } static void @@ -2457,20 +2463,25 @@ upstream_on_read(struct bufferevent *bev, void *ctx) return; } - evbuffer_remove(input, tnt, 9); - + evbuffer_copyout(input, tnt, 9); switch (tnt[8]) { case BIN_STATS: + if (len - 9 < sizeof(pool_stats_t)) + return; + evbuffer_drain(input, 9); upstream_on_stats(bev); break; case BIN_BALANCE: + if (len - 9 < sizeof(uint64_t)+ADDRESS_MAX) + return; + evbuffer_drain(input, 9); upstream_on_balance(bev); break; default: log_error("Unsupported message type: %d", tnt[8]); - evbuffer_drain(input, evbuffer_get_length(input)); + evbuffer_drain(input, len); return; } } @@ -2610,7 +2621,7 @@ client_add(int fd, struct bufferevent *bev, bool downstream) if (!downstream) pool_stats.connected_miners++; if (upstream_event) - upstream_send_client_connect(); + upstream_send_client_connect(1); } static void @@ -3234,24 +3245,35 @@ trusted_on_read(struct bufferevent *bev, void *ctx) return; } - evbuffer_remove(input, tnt, 9); + evbuffer_copyout(input, tnt, 9); log_trace("Downstream message: %d", tnt[8]); switch (tnt[8]) { case BIN_PING: case BIN_STATS: + evbuffer_drain(input, 9); trusted_send_stats(client); break; case BIN_CONNECT: + if (len - 9 < sizeof(uint32_t)) + return; + evbuffer_drain(input, 9); trusted_on_client_connect(client); break; case BIN_DISCONNECT: + evbuffer_drain(input, 9); trusted_on_client_disconnect(client); break; case BIN_SHARE: + if (len - 9 < sizeof(share_t)) + return; + evbuffer_drain(input, 9); trusted_on_client_share(client); break; case BIN_BLOCK: + if (len - 9 < sizeof(block_t)) + return; + evbuffer_drain(input, 9); trusted_on_client_block(client); break; default: