batch transfers

Use `transfer_split` instead of `transfer`. The downside is we can no
longer store the tx hash in payments table. This is because we have no
way of knowing which returned tx hash contains the destination for a
payment. Wallet stores tx hash anyway, so not an issue.
This commit is contained in:
Jethro Grassie 2019-05-05 16:58:08 -04:00
parent 5d84d80c61
commit daf01452ff
No known key found for this signature in database
GPG key ID: DE8ED755616565BB
3 changed files with 106 additions and 46 deletions

View file

@ -196,9 +196,9 @@ typedef struct block_t
typedef struct payment_t
{
char tx_hash[64];
uint64_t amount;
time_t timestamp;
char address[ADDRESS_MAX];
} payment_t;
typedef struct rpc_callback_t
@ -783,6 +783,14 @@ send_payments()
return rc;
}
size_t payments_count = 0;
size_t payments_max_count = 25;
size_t payments_size = payments_max_count * sizeof(payment_t);
payment_t *payments = (payment_t*) calloc(1, payments_size);
memset(payments, 0, payments_size);
payment_t *payment = payments;
payment_t *end_payment = payment + payments_max_count;
MDB_cursor_op op = MDB_FIRST;
while (1)
{
@ -801,19 +809,51 @@ send_payments()
log_info("Sending payment of %"PRIu64" to %s\n", amount, address);
char body[RPC_BODY_MAX];
snprintf(body, RPC_BODY_MAX, "{\"id\":\"0\",\"jsonrpc\":\"2.0\",\"method\":\"transfer\",\"params\":{"
"\"destinations\":[{\"amount\":%"PRIu64",\"address\":\"%s\"}],\"mixin\":10}}",
amount, address);
log_trace(body);
rpc_callback_t *callback = calloc(1, sizeof(rpc_callback_t));
callback->data = calloc(ADDRESS_MAX, sizeof(char));
memcpy(callback->data, address, ADDRESS_MAX);
callback->cb = rpc_on_wallet_transferred;
rpc_wallet_request(base, body, callback);
memcpy(payment->address, address, ADDRESS_MAX);
payment->amount = amount;
payments_count++;
if (++payment == end_payment)
{
payments_size <<= 1;
payments = (payment_t*) realloc(payments, payments_size);
payment = payments + payments_max_count;
memset(payment, 0, sizeof(payment_t) * payments_max_count);
payments_max_count <<= 1;
end_payment = payments + payments_max_count;
}
}
mdb_cursor_close(cursor);
mdb_txn_abort(txn);
if (payments_count)
{
size_t body_size = 160 * payments_count + 128;
char *body = (char*) alloca(body_size);
char *start = body;
char *end = body + body_size;
start = stecpy(start, "{\"id\":\"0\",\"jsonrpc\":\"2.0\",\"method\":\"transfer_split\",\"params\":{"
"\"ring_size\":11,\"destinations\":[", end);
for (size_t i=0; i<payments_count; i++)
{
payment_t *p = &payments[i];
start = stecpy(start, "{\"address\":\"", end);
start = stecpy(start, p->address, end);
start = stecpy(start, "\",\"amount\":", end);
sprintf(start, "%"PRIu64"}", p->amount);
start = body + strlen(body);
if (i != payments_count -1)
start = stecpy(start, ",", end);
else
start = stecpy(start, "]}}", end);
}
log_trace(body);
rpc_callback_t *callback = calloc(1, sizeof(rpc_callback_t));
callback->data = payments;
callback->cb = rpc_on_wallet_transferred;
rpc_wallet_request(base, body, callback);
}
return 0;
}
@ -1466,11 +1506,11 @@ rpc_on_block_submitted(const char* data, rpc_callback_t *callback)
JSON_GET_OR_WARN(message, error, json_type_string);
int ec = json_object_get_int(code);
const char *em = json_object_get_string(message);
log_debug("Error (%d) with block submission: %s", ec, em);
log_warn("Error (%d) with block submission: %s", ec, em);
}
if (status == NULL || strcmp(ss, "OK") != 0)
{
log_debug("Error submitting block: %s", ss);
log_warn("Error submitting block: %s", ss);
}
pool_stats.pool_blocks_found++;
block_t *b = (block_t*)callback->data;
@ -1486,7 +1526,6 @@ static void
rpc_on_wallet_transferred(const char* data, rpc_callback_t *callback)
{
log_trace("Transfer response: \n%s", data);
const char* address = callback->data;
json_object *root = json_tokener_parse(data);
JSON_GET_OR_WARN(result, root, json_type_object);
json_object *error = NULL;
@ -1506,6 +1545,8 @@ rpc_on_wallet_transferred(const char* data, rpc_callback_t *callback)
char *err;
MDB_txn *txn;
MDB_cursor *cursor;
/* First, updated balance(s) */
if ((rc = mdb_txn_begin(env, NULL, 0, &txn)) != 0)
{
err = mdb_strerror(rc);
@ -1519,38 +1560,43 @@ rpc_on_wallet_transferred(const char* data, rpc_callback_t *callback)
mdb_txn_abort(txn);
goto cleanup;
}
MDB_cursor_op op = MDB_SET;
MDB_val key = {ADDRESS_MAX, (void*)address};
MDB_val val;
rc = mdb_cursor_get(cursor, &key, &val, op);
if (rc == MDB_NOTFOUND)
payment_t *payment = (payment_t*) callback->data;
for (; payment->amount; payment++)
{
log_error("Payment made to non-existent address");
mdb_txn_abort(txn);
goto cleanup;
MDB_cursor_op op = MDB_SET;
MDB_val key = {ADDRESS_MAX, (void*)payment->address};
MDB_val val;
rc = mdb_cursor_get(cursor, &key, &val, op);
if (rc == MDB_NOTFOUND)
{
log_error("Payment made to non-existent address");
continue;
}
else if (rc != 0 && rc != MDB_NOTFOUND)
{
err = mdb_strerror(rc);
log_error("%s", err);
continue;
}
uint64_t current_amount = *(uint64_t*)val.mv_data;
current_amount -= payment->amount;
MDB_val new_val = {sizeof(current_amount), (void*)&current_amount};
rc = mdb_cursor_put(cursor, &key, &new_val, MDB_CURRENT);
if (rc != 0)
{
err = mdb_strerror(rc);
log_error("%s", err);
}
}
else if (rc != 0 && rc != MDB_NOTFOUND)
if ((rc = mdb_txn_commit(txn)) != 0)
{
err = mdb_strerror(rc);
log_error("%s", err);
log_error("Error committing updated balance(s): %s", err);
mdb_txn_abort(txn);
goto cleanup;
}
mdb_cursor_del(cursor, 0);
mdb_txn_commit(txn);
/* Now store payment info */
JSON_GET_OR_WARN(tx_hash, result, json_type_string);
JSON_GET_OR_WARN(amount, result, json_type_int);
const char *ths = json_object_get_string(tx_hash);
uint64_t ai = json_object_get_int64(amount);
time_t now = time(NULL);
payment_t payment;
memcpy(payment.tx_hash, ths, sizeof(payment.tx_hash));
payment.amount = ai;
payment.timestamp = now;
if ((rc = mdb_txn_begin(env, NULL, 0, &txn)) != 0)
{
err = mdb_strerror(rc);
@ -1564,16 +1610,19 @@ rpc_on_wallet_transferred(const char* data, rpc_callback_t *callback)
mdb_txn_abort(txn);
goto cleanup;
}
key.mv_data = (void*)address;
key.mv_size = ADDRESS_MAX;
val.mv_data = &payment;
val.mv_size = sizeof(payment);
if ((rc = mdb_cursor_put(cursor, &key, &val, MDB_APPENDDUP)) != 0)
time_t now = time(NULL);
payment = (payment_t*) callback->data;
for (; payment->amount; payment++)
{
err = mdb_strerror(rc);
log_error("Error putting payment: %s", err);
mdb_txn_abort(txn);
goto cleanup;
payment->timestamp = now;
MDB_val key = {ADDRESS_MAX, (void*)payment->address};
MDB_val val = {sizeof(payment_t), payment};
if ((rc = mdb_cursor_put(cursor, &key, &val, MDB_APPENDDUP)) != 0)
{
err = mdb_strerror(rc);
log_error("Error putting payment: %s", err);
continue;
}
}
if ((rc = mdb_txn_commit(txn)) != 0)
{
@ -1582,6 +1631,7 @@ rpc_on_wallet_transferred(const char* data, rpc_callback_t *callback)
mdb_txn_abort(txn);
goto cleanup;
}
cleanup:
json_object_put(root);
}

View file

@ -100,3 +100,12 @@ reverse_bin(char *bin, size_t len)
}
}
char *stecpy(char *dst, const char *src, const char *end)
{
while (*src && dst < end)
*dst++ = *src++;
if (dst < end)
*dst = '\0';
return dst;
}

View file

@ -37,5 +37,6 @@ int is_hex_string(const char *str);
void hex_to_bin(const char *hex, char *bin, size_t bin_size);
void bin_to_hex(const char *bin, size_t bin_size, char *hex);
void reverse_bin(char *bin, size_t len);
char *stecpy(char *dst, const char *src, const char *end);
#endif