stats: count by account instead of connections

This commit is contained in:
Jethro Grassie 2020-08-29 05:32:50 -04:00
parent d2f6c7d45b
commit 4fbbed9523
No known key found for this signature in database
GPG Key ID: DE8ED755616565BB
4 changed files with 1627 additions and 140 deletions

218
src/growbag.c Normal file
View File

@ -0,0 +1,218 @@
/*
Copyright (c) 2014-2020, The Monero Project
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its contributors
may be used to endorse or promote products derived from this software without
specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Parts of the project are originally copyright (c) 2012-2013 The Cryptonote
developers.
*/
#include "growbag.h"
#include <assert.h>
#include <string.h>
#include <stdlib.h>
#include <stdbool.h>
struct gbag_t
{
size_t z;
size_t max;
size_t ref;
char * b;
char * n;
char * ni;
gbag_recycle rc;
gbag_moved mv;
};
void
gbag_new(gbag_t **out, size_t count, size_t size,
gbag_recycle recycle, gbag_moved moved)
{
assert(*out==NULL && count && size);
gbag_t *gb = (gbag_t*) calloc(1, sizeof(gbag_t));
gb->z = size;
gb->max = count;
gb->ref = 0;
gb->b = (char*) calloc(gb->max, gb->z);
gb->n = gb->b;
gb->ni = gb->b;
gb->rc = recycle;
gb->mv = moved;
*out = gb;
}
void
gbag_free(gbag_t *gb)
{
assert(gb && gb->max && gb->b);
char *end = gb->b + (gb->max * gb->z);
char *cur = gb->b;
if (gb->rc)
{
while (cur < end)
{
gb->rc(cur);
cur += gb->z;
}
}
free(gb->b);
gb->max = 0;
gb->ref = 0;
gb->b = NULL;
gb->n = NULL;
gb->ni = NULL;
gb->rc = NULL;
gb->mv = NULL;
}
void *
gbag_get(gbag_t *gb)
{
assert(gb && gb->max && gb->b);
char *end = gb->b + (gb->max * gb->z);
char *from = gb->n;
size_t nm, cz;
char *b = NULL;
if (gb->ref == gb->max)
goto grow;
scan:
do
{
if (!*gb->n)
{
gb->ref++;
return gb->n;
}
gb->n += gb->z;
}
while(gb->n < end);
if (from != gb->b)
{
end = from;
gb->n = gb->b;
from = gb->n;
goto scan;
}
else
{
grow:
cz = gb->max * gb->z;
nm = gb->max << 1;
b = (char*) realloc(gb->b, nm * gb->z);
if (b == NULL)
return NULL;
memset(b + cz, 0, cz);
gb->max = nm;
if (gb->mv && gb->b != b)
gb->mv(b, cz);
gb->b = b;
gb->n = b + cz;
gb->ref++;
return gb->n;
}
return NULL;
}
void
gbag_put(gbag_t *gb, void *item)
{
assert(gb && item && gb->ref>0);
if (gb->rc)
gb->rc(item);
memset(item, 0, gb->z);
gb->n = (char*)item;
gb->ref--;
}
size_t
gbag_max(gbag_t *gb)
{
return gb->max;
}
size_t
gbag_used(gbag_t *gb)
{
return gb->ref;
}
void *
gbag_find(gbag_t *gb, const void *key, gbag_cmp cmp)
{
assert(gb && gb->b && gb->max);
return gbag_find_after(gb, key, cmp, NULL);
}
void *
gbag_find_after(gbag_t *gb, const void *key, gbag_cmp cmp, void *from)
{
assert(gb && gb->b && gb->max);
char *s = gb->b;
char *e = gb->b + (gb->max * gb->z);
if (from)
s = ((char*)from) + gb->z;
int c = (e-s)/gb->z;
return bsearch(key, s, c, gb->z, cmp);
}
void *
gbag_first(gbag_t *gb)
{
assert(gb && gb->b && gb->max);
char *s = gb->b;
char *e = gb->b + (gb->max * gb->z);
gb->ni = s;
do
{
if (*s)
return s;
s += gb->z;
}
while (s<e);
return NULL;
}
void *
gbag_next(gbag_t *gb, void* from)
{
assert(gb && gb->b && gb->max);
if (from)
gb->ni = ((char*)from) + gb->z;
char *e = gb->b + (gb->max * gb->z);
char *s = gb->ni;
while (s<e)
{
gb->ni += gb->z;
if (*s)
return s;
s += gb->z;
}
return NULL;
}

60
src/growbag.h Normal file
View File

@ -0,0 +1,60 @@
/*
Copyright (c) 2014-2020, The Monero Project
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its contributors
may be used to endorse or promote products derived from this software without
specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Parts of the project are originally copyright (c) 2012-2013 The Cryptonote
developers.
*/
/* A bag of memory that exponentially grows */
#ifndef GBAG_H
#define GBAG_H
#include <stddef.h>
typedef struct gbag_t gbag_t;
typedef void (*gbag_recycle)(void*);
typedef void (*gbag_moved)(const void*,size_t);
typedef int (*gbag_cmp)(const void*,const void*);
void gbag_new(gbag_t **out, size_t count, size_t size,
gbag_recycle recycle, gbag_moved moved);
void gbag_free(gbag_t *gb);
void * gbag_get(gbag_t *gb);
void gbag_put(gbag_t *gb, void *item);
size_t gbag_max(gbag_t *gb);
size_t gbag_used(gbag_t *gb);
void * gbag_find(gbag_t *gb, const void *key, gbag_cmp cmp);
void * gbag_find_after(gbag_t *gb, const void *key, gbag_cmp cmp, void* from);
void * gbag_first(gbag_t *gb);
void * gbag_next(gbag_t *gb, void* from);
#endif

View File

@ -69,9 +69,11 @@ developers.
#include "log.h"
#include "webui.h"
#include "forkoff.h"
#include "growbag.h"
#include "uthash.h"
#define MAX_LINE 8192
#define POOL_CLIENTS_GROW 0x4000
#define CLIENTS_INIT 0x4000
#define RPC_BODY_MAX 8192
#define JOB_BODY_MAX 8192
#define ERROR_BODY_MAX 512
@ -212,13 +214,17 @@ typedef struct client_t
uint8_t bad_shares;
bool downstream;
uint32_t downstream_miners;
UT_hash_handle hh;
} client_t;
typedef struct pool_clients_t
typedef struct account_stats_t
{
client_t *clients;
size_t count;
} pool_clients_t;
char address[ADDRESS_MAX];
size_t worker_count;
time_t connected_since;
uint64_t hashes;
UT_hash_handle hh;
} account_stats_t;
typedef struct share_t
{
@ -248,14 +254,15 @@ typedef struct payment_t
typedef struct rpc_callback_t rpc_callback_t;
typedef void (*rpc_callback_fun)(const char*, rpc_callback_t*);
typedef void (*rpc_datafree_fun)(void*);
struct rpc_callback_t
{
rpc_callback_fun f;
rpc_callback_fun cf;
void *data;
rpc_datafree_fun df;
};
static config_t config;
static pool_clients_t pool_clients;
static bstack_t *bst;
static bstack_t *bsh;
static struct event_base *pool_base;
@ -281,6 +288,8 @@ static pthread_cond_t cond_clients = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t mutex_clients = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t mutex_log = PTHREAD_MUTEX_INITIALIZER;
static pthread_rwlock_t rwlock_tx = PTHREAD_RWLOCK_INITIALIZER;
static pthread_rwlock_t rwlock_acc = PTHREAD_RWLOCK_INITIALIZER;
static pthread_rwlock_t rwlock_cfd = PTHREAD_RWLOCK_INITIALIZER;
static FILE *fd_log;
static unsigned char sec_view[32];
static unsigned char pub_spend[32];
@ -293,6 +302,10 @@ static struct event *timer_10s;
static time_t upstream_last_time;
static uint64_t upstream_last_height;
static uint32_t miner_count;
static client_t *clients_by_fd = NULL;
static account_stats_t *account_stats = NULL;
static gbag_t *bag_stats;
static gbag_t *bag_clients;
#ifdef HAVE_RX
extern void rx_stop_mining();
@ -324,11 +337,12 @@ void rx_slow_hash_free_state(){}
}
static inline rpc_callback_t *
rpc_callback_new(rpc_callback_fun f, void *data)
rpc_callback_new(rpc_callback_fun cf, void *data, rpc_datafree_fun df)
{
rpc_callback_t *c = calloc(1, sizeof(rpc_callback_t));
c->f = f;
c->cf = cf;
c->data = data;
c->df = df;
return c;
}
@ -338,10 +352,21 @@ rpc_callback_free(rpc_callback_t *callback)
if (!callback)
return;
if (callback->data)
free(callback->data);
{
if (callback->df)
callback->df(callback->data);
else
free(callback->data);
}
free(callback);
}
static inline void
rpc_bag_free(void* data)
{
gbag_free((gbag_t*)data);
}
static int
compare_uint64(const MDB_val *a, const MDB_val *b)
{
@ -606,27 +631,19 @@ store_block(uint64_t height, block_t *block)
uint64_t
miner_hr(const char *address)
{
pthread_mutex_lock(&mutex_clients);
clients_reading++;
pthread_mutex_unlock(&mutex_clients);
client_t *c = pool_clients.clients;
uint64_t hr = 0;
for (size_t i = 0; i < pool_clients.count; i++, c++)
{
if (c->connected_since
&& !strncmp(c->address, address, ADDRESS_MAX))
{
double d = difftime(time(NULL), c->connected_since);
if (d == 0.0)
continue;
hr += c->hashes / d;
continue;
}
}
pthread_mutex_lock(&mutex_clients);
clients_reading--;
pthread_cond_signal(&cond_clients);
pthread_mutex_unlock(&mutex_clients);
double d = 0.0;
account_stats_t *stats = NULL;
pthread_rwlock_rdlock(&rwlock_acc);
HASH_FIND_STR(account_stats, address, stats);
if (!stats || !stats->connected_since || !stats->hashes)
goto bail;
d = difftime(time(NULL), stats->connected_since);
if (d == 0.0)
goto bail;
hr = stats->hashes / d;
bail:
pthread_rwlock_unlock(&rwlock_acc);
return hr;
}
@ -937,16 +954,13 @@ static void
update_pool_hr(void)
{
uint64_t hr = 0;
client_t *c = pool_clients.clients;
for (size_t i = 0; i < pool_clients.count; i++, c++)
client_t *c = (client_t*) gbag_first(bag_clients);
while ((c = gbag_next(bag_clients,NULL)))
{
if (c->connected_since)
{
double d = difftime(time(NULL), c->connected_since);
if (d == 0.0)
continue;
hr += c->hashes / d;
}
double d = difftime(time(NULL), c->connected_since);
if (d == 0.0)
continue;
hr += c->hashes / d;
}
log_debug("Pool hashrate: %"PRIu64, hr);
if (upstream_event)
@ -1344,10 +1358,34 @@ miner_send_job(client_t *client, bool response)
}
static void
pool_clients_send_job(void)
account_stats_moved(const void *items, size_t count)
{
client_t *c = pool_clients.clients;
for (size_t i = 0; i < pool_clients.count; i++, c++)
account_stats_t *s, *e, *r;
s = (account_stats_t*) items;
e = s + count;
pthread_rwlock_wrlock(&rwlock_acc);
while (s<e)
HASH_REPLACE_STR(account_stats, address, s, r);
pthread_rwlock_unlock(&rwlock_acc);
}
static void
clients_moved(const void *items, size_t count)
{
client_t *s, *e, *r;
s = (client_t*) items;
e = s + count;
pthread_rwlock_wrlock(&rwlock_cfd);
while (s<e)
HASH_REPLACE_INT(clients_by_fd, fd, s, r);
pthread_rwlock_unlock(&rwlock_cfd);
}
static void
clients_send_job(void)
{
client_t *c = (client_t*) gbag_first(bag_clients);
while ((c = gbag_next(bag_clients,NULL)))
{
if (c->fd == 0 || c->address[0] == 0 || c->downstream)
continue;
@ -1356,27 +1394,36 @@ pool_clients_send_job(void)
}
static void
pool_clients_init(void)
clients_init(void)
{
assert(pool_clients.count == 0);
pool_clients.count = POOL_CLIENTS_GROW;
pool_clients.clients = (client_t*) calloc(pool_clients.count,
sizeof(client_t));
gbag_new(&bag_stats, CLIENTS_INIT, sizeof(account_stats_t), 0,
account_stats_moved);
gbag_new(&bag_clients, CLIENTS_INIT, sizeof(client_t), 0,
clients_moved);
}
static void
pool_clients_free(void)
clients_free(void)
{
if (!pool_clients.clients)
if (!(bag_stats && bag_clients))
return;
client_t *c = pool_clients.clients;
for (size_t i = 0; i < pool_clients.count; i++, c++)
client_t *c = (client_t*) gbag_first(bag_clients);
while ((c = gbag_next(bag_clients,NULL)))
{
if (!c->active_jobs)
continue;
client_clear_jobs(c);
}
free(pool_clients.clients);
pthread_rwlock_wrlock(&rwlock_cfd);
HASH_CLEAR(hh, clients_by_fd);
gbag_free(bag_clients);
pthread_rwlock_unlock(&rwlock_cfd);
pthread_rwlock_wrlock(&rwlock_acc);
HASH_CLEAR(hh, account_stats);
gbag_free(bag_stats);
pthread_rwlock_unlock(&rwlock_acc);
}
static void
@ -1464,7 +1511,7 @@ rpc_on_response(struct evhttp_request *req, void *arg)
char body[len+1];
evbuffer_remove(input, body, len);
body[len] = '\0';
callback->f(body, callback);
callback->cf(body, callback);
rpc_callback_free(callback);
}
@ -1659,7 +1706,7 @@ rpc_on_block_template(const char* data, rpc_callback_t *callback)
pool_stats.last_template_fetched = time(NULL);
block_template_t *top = (block_template_t*) bstack_push(bst, NULL);
response_to_block_template(result, top);
pool_clients_send_job();
clients_send_job();
json_object_put(root);
}
@ -1770,7 +1817,7 @@ startup_payout(uint64_t height)
rpc_get_request_body(body, "get_block_header_by_height", "sd",
"height", block->height);
rpc_callback_t *cb = rpc_callback_new(
rpc_on_block_header_by_height, NULL);
rpc_on_block_header_by_height, 0, 0);
rpc_request(pool_base, body, cb);
}
@ -1862,7 +1909,7 @@ rpc_on_last_block_header(const char* data, rpc_callback_t *callback)
uint64_t reserve = 17;
rpc_get_request_body(body, "get_block_template", "sssd",
"wallet_address", config.pool_wallet, "reserve_size", reserve);
rpc_callback_t *cb1 = rpc_callback_new(rpc_on_block_template, NULL);
rpc_callback_t *cb1 = rpc_callback_new(rpc_on_block_template, 0, 0);
rpc_request(pool_base, body, cb1);
uint64_t end = top->height - 60;
@ -1870,7 +1917,7 @@ rpc_on_last_block_header(const char* data, rpc_callback_t *callback)
rpc_get_request_body(body, "get_block_headers_range", "sdsd",
"start_height", start, "end_height", end);
rpc_callback_t *cb2 = rpc_callback_new(
rpc_on_block_headers_range, NULL);
rpc_on_block_headers_range, 0, 0);
rpc_request(pool_base, body, cb2);
}
@ -1955,11 +2002,12 @@ rpc_on_wallet_transferred(const char* data, rpc_callback_t *callback)
mdb_txn_abort(txn);
goto cleanup;
}
payment_t *payment = (payment_t*) callback->data;
for (; payment->amount; payment++)
gbag_t *bag_pay = (gbag_t*) callback->data;
payment_t *p = (payment_t*) gbag_first(bag_pay);
while((p = gbag_next(bag_pay, 0)))
{
MDB_cursor_op op = MDB_SET;
MDB_val key = {ADDRESS_MAX, (void*)payment->address};
MDB_val key = {ADDRESS_MAX, (void*)p->address};
MDB_val val;
rc = mdb_cursor_get(cursor, &key, &val, op);
if (rc == MDB_NOTFOUND)
@ -1975,21 +2023,21 @@ rpc_on_wallet_transferred(const char* data, rpc_callback_t *callback)
}
uint64_t current_amount = *(uint64_t*)val.mv_data;
if (current_amount >= payment->amount)
if (current_amount >= p->amount)
{
current_amount -= payment->amount;
current_amount -= p->amount;
}
else
{
log_error("Payment was more than balance: %"PRIu64" > %"PRIu64,
payment->amount, current_amount);
p->amount, current_amount);
current_amount = 0;
}
if (error)
{
log_warn("Error seen on transfer for %s with amount %"PRIu64,
payment->address, payment->amount);
p->address, p->amount);
}
MDB_val new_val = {sizeof(current_amount), (void*)&current_amount};
rc = mdb_cursor_put(cursor, &key, &new_val, MDB_CURRENT);
@ -2022,12 +2070,12 @@ rpc_on_wallet_transferred(const char* data, rpc_callback_t *callback)
goto cleanup;
}
time_t now = time(NULL);
payment = (payment_t*) callback->data;
for (; payment->amount; payment++)
p = (payment_t*) gbag_first(bag_pay);
while((p = gbag_next(bag_pay, 0)))
{
payment->timestamp = now;
MDB_val key = {ADDRESS_MAX, (void*)payment->address};
MDB_val val = {sizeof(payment_t), payment};
p->timestamp = now;
MDB_val key = {ADDRESS_MAX, (void*)p->address};
MDB_val val = {sizeof(payment_t), p};
if ((rc = mdb_cursor_put(cursor, &key, &val, MDB_APPENDDUP)) != 0)
{
err = mdb_strerror(rc);
@ -2071,12 +2119,8 @@ send_payments(void)
return rc;
}
size_t payments_count = 0;
size_t payments_max_count = 25;
size_t payments_size = payments_max_count * sizeof(payment_t);
payment_t *payments = (payment_t*) calloc(1, payments_size);
payment_t *payment = payments;
payment_t *end_payment = payment + payments_max_count;
gbag_t *bag_pay;
gbag_new(&bag_pay, 25, sizeof(payment_t), 0, 0);
MDB_cursor_op op = MDB_FIRST;
while (1)
@ -2096,52 +2140,43 @@ send_payments(void)
log_info("Sending payment: %"PRIu64", %.8s", amount, address);
strncpy(payment->address, address, ADDRESS_MAX-1);
payment->amount = amount;
payments_count++;
if (++payment == end_payment)
{
payments_size <<= 1;
payments = (payment_t*) realloc(payments, payments_size);
payment = payments + payments_max_count;
memset(payment, 0, sizeof(payment_t) * payments_max_count);
payments_max_count <<= 1;
end_payment = payments + payments_max_count;
}
payment_t *p = (payment_t*) gbag_get(bag_pay);
strncpy(p->address, address, ADDRESS_MAX-1);
p->amount = amount;
}
mdb_cursor_close(cursor);
mdb_txn_abort(txn);
if (payments_count)
size_t proc = gbag_used(bag_pay);
if (proc)
{
size_t body_size = 160 * payments_count + 128;
size_t body_size = 160 * proc + 128;
char body[body_size];
char *start = body;
char *end = body + body_size;
start = stecpy(start, "{\"id\":\"0\",\"jsonrpc\":\"2.0\",\"method\":"
"\"transfer_split\",\"params\":{"
"\"ring_size\":11,\"destinations\":[", end);
for (size_t i=0; i<payments_count; i++)
payment_t *p = (payment_t*) gbag_first(bag_pay);
while ((p = gbag_next(bag_pay, NULL)))
{
payment_t *p = &payments[i];
start = stecpy(start, "{\"address\":\"", end);
start = stecpy(start, p->address, end);
start = stecpy(start, "\",\"amount\":", end);
sprintf(start, "%"PRIu64"}", p->amount);
start = body + strlen(body);
if (i != payments_count -1)
if (--proc)
start = stecpy(start, ",", end);
else
start = stecpy(start, "]}}", end);
}
log_trace(body);
rpc_callback_t *cb = rpc_callback_new(
rpc_on_wallet_transferred, payments);
rpc_on_wallet_transferred, bag_pay, rpc_bag_free);
rpc_wallet_request(pool_base, body, cb);
}
else
free(payments);
gbag_free(bag_pay);
return 0;
}
@ -2159,7 +2194,7 @@ fetch_view_key(void)
return;
char body[RPC_BODY_MAX] = {0};
rpc_get_request_body(body, "query_key", "ss", "key_type", "view_key");
rpc_callback_t *cb = rpc_callback_new(rpc_on_view_key, NULL);
rpc_callback_t *cb = rpc_callback_new(rpc_on_view_key, 0, 0);
rpc_wallet_request(pool_base, body, cb);
}
@ -2169,7 +2204,7 @@ fetch_last_block_header(void)
log_info("Fetching last block header");
char body[RPC_BODY_MAX] = {0};
rpc_get_request_body(body, "get_last_block_header", NULL);
rpc_callback_t *cb = rpc_callback_new(rpc_on_last_block_header, NULL);
rpc_callback_t *cb = rpc_callback_new(rpc_on_last_block_header, 0, 0);
rpc_request(pool_base, body, cb);
}
@ -2686,42 +2721,28 @@ timer_on_10m(int fd, short kind, void *ctx)
static void
client_add(int fd, struct bufferevent *bev, bool downstream)
{
client_t *c = pool_clients.clients;
bool resize = true;
for (size_t i = 0; i < pool_clients.count; i++, c++)
{
if (c->connected_since == 0)
{
resize = false;
break;
}
}
client_t *c;
bool resize = gbag_used(bag_clients) == gbag_max(bag_clients);
if (resize)
{
pthread_mutex_lock(&mutex_clients);
while (clients_reading)
pthread_cond_wait(&cond_clients, &mutex_clients);
pool_clients.count += POOL_CLIENTS_GROW;
c = realloc(pool_clients.clients, sizeof(client_t) *
pool_clients.count);
pool_clients.clients = c;
c += pool_clients.count - POOL_CLIENTS_GROW;
memset(c, 0, sizeof(client_t) * POOL_CLIENTS_GROW);
c = gbag_get(bag_clients);
pthread_mutex_unlock(&mutex_clients);
log_debug("Client pool can now hold %zu clients", pool_clients.count);
log_debug("Client pool can now hold %zu clients",
gbag_max(bag_clients));
}
else
memset(c, 0, sizeof(client_t));
c = gbag_get(bag_clients);
c->fd = fd;
c->bev = bev;
c->connected_since = time(NULL);
c->downstream = downstream;
bstack_new(&c->active_jobs, CLIENT_JOBS_MAX, sizeof(job_t), job_recycle);
miner_count++;
if (!downstream)
pool_stats.connected_miners++;
if (upstream_event)
upstream_send_client_connect(1);
pthread_rwlock_wrlock(&rwlock_cfd);
HASH_ADD_INT(clients_by_fd, fd, c);
pthread_rwlock_unlock(&rwlock_cfd);
}
static void
@ -2733,16 +2754,9 @@ client_find(struct bufferevent *bev, client_t **client)
*client = NULL;
return;
}
client_t *c = pool_clients.clients;
for (size_t i = 0; i < pool_clients.count; i++, c++)
{
if (c->fd == fd)
{
*client = c;
return;
}
}
*client = NULL;
pthread_rwlock_rdlock(&rwlock_cfd);
HASH_FIND_INT(clients_by_fd, &fd, *client);
pthread_rwlock_unlock(&rwlock_cfd);
}
static void
@ -2753,15 +2767,31 @@ client_clear(struct bufferevent *bev)
if (!client)
return;
client_clear_jobs(client);
if (client->downstream)
pool_stats.connected_miners -= client->downstream_miners;
else
pool_stats.connected_miners--;
account_stats_t *stats = NULL;
pthread_rwlock_rdlock(&rwlock_acc);
HASH_FIND_STR(account_stats, client->address, stats);
pthread_rwlock_unlock(&rwlock_acc);
if (stats && stats->worker_count == 1)
{
if (client->downstream)
pool_stats.connected_miners -= client->downstream_miners;
else
pool_stats.connected_miners--;
if (upstream_event)
upstream_send_client_disconnect();
miner_count--;
pthread_rwlock_wrlock(&rwlock_acc);
HASH_DEL(account_stats, stats);
pthread_rwlock_unlock(&rwlock_acc);
gbag_put(bag_stats, stats);
}
else if (stats && stats->worker_count > 1)
stats->worker_count--;
pthread_rwlock_wrlock(&rwlock_cfd);
HASH_DEL(clients_by_fd, client);
pthread_rwlock_unlock(&rwlock_cfd);
memset(client, 0, sizeof(client_t));
bufferevent_free(bev);
if (upstream_event)
upstream_send_client_disconnect();
miner_count--;
}
static void
@ -2831,6 +2861,28 @@ miner_on_login(json_object *message, client_t *client)
strncpy(client->address, address, sizeof(client->address)-1);
strncpy(client->worker_id, worker_id, sizeof(client->worker_id)-1);
account_stats_t *stats = NULL;
pthread_rwlock_rdlock(&rwlock_acc);
HASH_FIND_STR(account_stats, client->address, stats);
pthread_rwlock_unlock(&rwlock_acc);
if (!stats)
{
miner_count++;
if (!client->downstream)
pool_stats.connected_miners++;
if (upstream_event)
upstream_send_client_connect(1);
stats = gbag_get(bag_stats);
strncpy(stats->address, address, sizeof(stats->address)-1);
stats->worker_count = 1;
stats->connected_since = time(NULL);
stats->hashes = 0;
pthread_rwlock_wrlock(&rwlock_acc);
HASH_ADD_STR(account_stats, address, stats);
pthread_rwlock_unlock(&rwlock_acc);
}
else
stats->worker_count++;
uuid_t cid;
uuid_generate(cid);
bin_to_hex((const unsigned char*)cid, sizeof(uuid_t),
@ -3137,6 +3189,11 @@ post_hash:
BN_free(rh);
/* Process share */
account_stats_t *stats = NULL;
pthread_rwlock_rdlock(&rwlock_acc);
HASH_FIND_STR(account_stats, client->address, stats);
pthread_rwlock_unlock(&rwlock_acc);
stats->hashes += job->target;
client->hashes += job->target;
time_t now = time(NULL);
bool can_store = true;
@ -3156,7 +3213,7 @@ post_hash:
"\"submit_block\", \"params\":[\"%s\"]}",
block_hex);
rpc_callback_t *cb = rpc_callback_new(rpc_on_block_submitted, NULL);
rpc_callback_t *cb = rpc_callback_new(rpc_on_block_submitted, 0, 0);
cb->data = calloc(1, sizeof(block_t));
block_t* b = (block_t*) cb->data;
b->height = bt->height;
@ -4077,7 +4134,7 @@ cleanup(void)
event_base_loopbreak(trusted_base);
if (pool_base)
event_base_free(pool_base);
pool_clients_free();
clients_free();
if (bsh)
bstack_free(bsh);
if (bst)
@ -4090,6 +4147,8 @@ cleanup(void)
pthread_mutex_destroy(&mutex_clients);
pthread_mutex_destroy(&mutex_log);
pthread_rwlock_destroy(&rwlock_tx);
pthread_rwlock_destroy(&rwlock_acc);
pthread_rwlock_destroy(&rwlock_cfd);
pthread_cond_destroy(&cond_clients);
log_info("Pool shutdown successfully");
if (fd_log)
@ -4252,7 +4311,7 @@ int main(int argc, char **argv)
uuid_generate(iid);
memcpy(&instance_id, iid, 4);
pool_clients_init();
clients_init();
wui_context_t uic;
uic.port = config.webui_port;

1150
src/uthash.h Normal file

File diff suppressed because it is too large Load Diff