upstream: sync a count with connect

This commit is contained in:
Jethro Grassie 2020-04-26 19:41:19 -04:00
parent 97c3993cf7
commit 61f1abbc30
No known key found for this signature in database
GPG Key ID: DE8ED755616565BB
1 changed files with 33 additions and 11 deletions

View File

@ -2170,14 +2170,16 @@ upstream_send_ping()
}
static void
upstream_send_client_connect()
upstream_send_client_connect(uint32_t count)
{
struct evbuffer *output = bufferevent_get_output(upstream_event);
char data[9];
size_t z = 9 + sizeof(uint32_t);
char data[z];
int t = BIN_CONNECT;
memcpy(data, msgbin, 8);
memcpy(data+8, &t, 1);
evbuffer_add(output, data, 9);
memcpy(data+9, &count, z-9);
evbuffer_add(output, data, z);
log_trace("Sending message connect upstream");
}
@ -2322,19 +2324,23 @@ upstream_send_backlog()
mdb_cursor_close(curshr);
mdb_cursor_close(curblk);
mdb_txn_abort(txn);
upstream_send_client_connect(pool_stats.connected_miners);
}
static void
trusted_on_client_connect(client_t *client)
{
pool_stats.connected_miners++;
client->downstream_miners++;
struct evbuffer *input = bufferevent_get_input(client->bev);
uint32_t count;
evbuffer_remove(input, &count, sizeof(uint32_t));
pool_stats.connected_miners += count;
client->downstream_miners += count;
log_trace("Downstream miner connected. "
"Miner count: %d, Pool hashrate: %"PRIu64,
pool_stats.connected_miners, pool_stats.pool_hashrate);
trusted_send_stats(client);
if (upstream_event)
upstream_send_client_connect();
upstream_send_client_connect(count);
}
static void
@ -2457,20 +2463,25 @@ upstream_on_read(struct bufferevent *bev, void *ctx)
return;
}
evbuffer_remove(input, tnt, 9);
evbuffer_copyout(input, tnt, 9);
switch (tnt[8])
{
case BIN_STATS:
if (len - 9 < sizeof(pool_stats_t))
return;
evbuffer_drain(input, 9);
upstream_on_stats(bev);
break;
case BIN_BALANCE:
if (len - 9 < sizeof(uint64_t)+ADDRESS_MAX)
return;
evbuffer_drain(input, 9);
upstream_on_balance(bev);
break;
default:
log_error("Unsupported message type: %d", tnt[8]);
evbuffer_drain(input, evbuffer_get_length(input));
evbuffer_drain(input, len);
return;
}
}
@ -2610,7 +2621,7 @@ client_add(int fd, struct bufferevent *bev, bool downstream)
if (!downstream)
pool_stats.connected_miners++;
if (upstream_event)
upstream_send_client_connect();
upstream_send_client_connect(1);
}
static void
@ -3234,24 +3245,35 @@ trusted_on_read(struct bufferevent *bev, void *ctx)
return;
}
evbuffer_remove(input, tnt, 9);
evbuffer_copyout(input, tnt, 9);
log_trace("Downstream message: %d", tnt[8]);
switch (tnt[8])
{
case BIN_PING:
case BIN_STATS:
evbuffer_drain(input, 9);
trusted_send_stats(client);
break;
case BIN_CONNECT:
if (len - 9 < sizeof(uint32_t))
return;
evbuffer_drain(input, 9);
trusted_on_client_connect(client);
break;
case BIN_DISCONNECT:
evbuffer_drain(input, 9);
trusted_on_client_disconnect(client);
break;
case BIN_SHARE:
if (len - 9 < sizeof(share_t))
return;
evbuffer_drain(input, 9);
trusted_on_client_share(client);
break;
case BIN_BLOCK:
if (len - 9 < sizeof(block_t))
return;
evbuffer_drain(input, 9);
trusted_on_client_block(client);
break;
default: