Merge branch 'batch-tx'

This commit is contained in:
Jethro Grassie 2019-05-05 19:02:00 -04:00
commit f8b6245a39
No known key found for this signature in database
GPG key ID: DE8ED755616565BB
3 changed files with 106 additions and 46 deletions

View file

@ -196,9 +196,9 @@ typedef struct block_t
typedef struct payment_t typedef struct payment_t
{ {
char tx_hash[64];
uint64_t amount; uint64_t amount;
time_t timestamp; time_t timestamp;
char address[ADDRESS_MAX];
} payment_t; } payment_t;
typedef struct rpc_callback_t typedef struct rpc_callback_t
@ -783,6 +783,14 @@ send_payments()
return rc; return rc;
} }
size_t payments_count = 0;
size_t payments_max_count = 25;
size_t payments_size = payments_max_count * sizeof(payment_t);
payment_t *payments = (payment_t*) calloc(1, payments_size);
memset(payments, 0, payments_size);
payment_t *payment = payments;
payment_t *end_payment = payment + payments_max_count;
MDB_cursor_op op = MDB_FIRST; MDB_cursor_op op = MDB_FIRST;
while (1) while (1)
{ {
@ -801,19 +809,51 @@ send_payments()
log_info("Sending payment of %"PRIu64" to %s\n", amount, address); log_info("Sending payment of %"PRIu64" to %s\n", amount, address);
char body[RPC_BODY_MAX]; memcpy(payment->address, address, ADDRESS_MAX);
snprintf(body, RPC_BODY_MAX, "{\"id\":\"0\",\"jsonrpc\":\"2.0\",\"method\":\"transfer\",\"params\":{" payment->amount = amount;
"\"destinations\":[{\"amount\":%"PRIu64",\"address\":\"%s\"}],\"mixin\":10}}", payments_count++;
amount, address);
log_trace(body); if (++payment == end_payment)
rpc_callback_t *callback = calloc(1, sizeof(rpc_callback_t)); {
callback->data = calloc(ADDRESS_MAX, sizeof(char)); payments_size <<= 1;
memcpy(callback->data, address, ADDRESS_MAX); payments = (payment_t*) realloc(payments, payments_size);
callback->cb = rpc_on_wallet_transferred; payment = payments + payments_max_count;
rpc_wallet_request(base, body, callback); memset(payment, 0, sizeof(payment_t) * payments_max_count);
payments_max_count <<= 1;
end_payment = payments + payments_max_count;
}
} }
mdb_cursor_close(cursor); mdb_cursor_close(cursor);
mdb_txn_abort(txn); mdb_txn_abort(txn);
if (payments_count)
{
size_t body_size = 160 * payments_count + 128;
char *body = (char*) alloca(body_size);
char *start = body;
char *end = body + body_size;
start = stecpy(start, "{\"id\":\"0\",\"jsonrpc\":\"2.0\",\"method\":\"transfer_split\",\"params\":{"
"\"ring_size\":11,\"destinations\":[", end);
for (size_t i=0; i<payments_count; i++)
{
payment_t *p = &payments[i];
start = stecpy(start, "{\"address\":\"", end);
start = stecpy(start, p->address, end);
start = stecpy(start, "\",\"amount\":", end);
sprintf(start, "%"PRIu64"}", p->amount);
start = body + strlen(body);
if (i != payments_count -1)
start = stecpy(start, ",", end);
else
start = stecpy(start, "]}}", end);
}
log_trace(body);
rpc_callback_t *callback = calloc(1, sizeof(rpc_callback_t));
callback->data = payments;
callback->cb = rpc_on_wallet_transferred;
rpc_wallet_request(base, body, callback);
}
return 0; return 0;
} }
@ -1466,11 +1506,11 @@ rpc_on_block_submitted(const char* data, rpc_callback_t *callback)
JSON_GET_OR_WARN(message, error, json_type_string); JSON_GET_OR_WARN(message, error, json_type_string);
int ec = json_object_get_int(code); int ec = json_object_get_int(code);
const char *em = json_object_get_string(message); const char *em = json_object_get_string(message);
log_debug("Error (%d) with block submission: %s", ec, em); log_warn("Error (%d) with block submission: %s", ec, em);
} }
if (status == NULL || strcmp(ss, "OK") != 0) if (status == NULL || strcmp(ss, "OK") != 0)
{ {
log_debug("Error submitting block: %s", ss); log_warn("Error submitting block: %s", ss);
} }
pool_stats.pool_blocks_found++; pool_stats.pool_blocks_found++;
block_t *b = (block_t*)callback->data; block_t *b = (block_t*)callback->data;
@ -1486,7 +1526,6 @@ static void
rpc_on_wallet_transferred(const char* data, rpc_callback_t *callback) rpc_on_wallet_transferred(const char* data, rpc_callback_t *callback)
{ {
log_trace("Transfer response: \n%s", data); log_trace("Transfer response: \n%s", data);
const char* address = callback->data;
json_object *root = json_tokener_parse(data); json_object *root = json_tokener_parse(data);
JSON_GET_OR_WARN(result, root, json_type_object); JSON_GET_OR_WARN(result, root, json_type_object);
json_object *error = NULL; json_object *error = NULL;
@ -1506,6 +1545,8 @@ rpc_on_wallet_transferred(const char* data, rpc_callback_t *callback)
char *err; char *err;
MDB_txn *txn; MDB_txn *txn;
MDB_cursor *cursor; MDB_cursor *cursor;
/* First, updated balance(s) */
if ((rc = mdb_txn_begin(env, NULL, 0, &txn)) != 0) if ((rc = mdb_txn_begin(env, NULL, 0, &txn)) != 0)
{ {
err = mdb_strerror(rc); err = mdb_strerror(rc);
@ -1519,38 +1560,43 @@ rpc_on_wallet_transferred(const char* data, rpc_callback_t *callback)
mdb_txn_abort(txn); mdb_txn_abort(txn);
goto cleanup; goto cleanup;
} }
payment_t *payment = (payment_t*) callback->data;
for (; payment->amount; payment++)
{
MDB_cursor_op op = MDB_SET; MDB_cursor_op op = MDB_SET;
MDB_val key = {ADDRESS_MAX, (void*)address}; MDB_val key = {ADDRESS_MAX, (void*)payment->address};
MDB_val val; MDB_val val;
rc = mdb_cursor_get(cursor, &key, &val, op); rc = mdb_cursor_get(cursor, &key, &val, op);
if (rc == MDB_NOTFOUND) if (rc == MDB_NOTFOUND)
{ {
log_error("Payment made to non-existent address"); log_error("Payment made to non-existent address");
mdb_txn_abort(txn); continue;
goto cleanup;
} }
else if (rc != 0 && rc != MDB_NOTFOUND) else if (rc != 0 && rc != MDB_NOTFOUND)
{ {
err = mdb_strerror(rc); err = mdb_strerror(rc);
log_error("%s", err); log_error("%s", err);
continue;
}
uint64_t current_amount = *(uint64_t*)val.mv_data;
current_amount -= payment->amount;
MDB_val new_val = {sizeof(current_amount), (void*)&current_amount};
rc = mdb_cursor_put(cursor, &key, &new_val, MDB_CURRENT);
if (rc != 0)
{
err = mdb_strerror(rc);
log_error("%s", err);
}
}
if ((rc = mdb_txn_commit(txn)) != 0)
{
err = mdb_strerror(rc);
log_error("Error committing updated balance(s): %s", err);
mdb_txn_abort(txn); mdb_txn_abort(txn);
goto cleanup; goto cleanup;
} }
mdb_cursor_del(cursor, 0);
mdb_txn_commit(txn);
/* Now store payment info */ /* Now store payment info */
JSON_GET_OR_WARN(tx_hash, result, json_type_string);
JSON_GET_OR_WARN(amount, result, json_type_int);
const char *ths = json_object_get_string(tx_hash);
uint64_t ai = json_object_get_int64(amount);
time_t now = time(NULL);
payment_t payment;
memcpy(payment.tx_hash, ths, sizeof(payment.tx_hash));
payment.amount = ai;
payment.timestamp = now;
if ((rc = mdb_txn_begin(env, NULL, 0, &txn)) != 0) if ((rc = mdb_txn_begin(env, NULL, 0, &txn)) != 0)
{ {
err = mdb_strerror(rc); err = mdb_strerror(rc);
@ -1564,16 +1610,19 @@ rpc_on_wallet_transferred(const char* data, rpc_callback_t *callback)
mdb_txn_abort(txn); mdb_txn_abort(txn);
goto cleanup; goto cleanup;
} }
key.mv_data = (void*)address; time_t now = time(NULL);
key.mv_size = ADDRESS_MAX; payment = (payment_t*) callback->data;
val.mv_data = &payment; for (; payment->amount; payment++)
val.mv_size = sizeof(payment); {
payment->timestamp = now;
MDB_val key = {ADDRESS_MAX, (void*)payment->address};
MDB_val val = {sizeof(payment_t), payment};
if ((rc = mdb_cursor_put(cursor, &key, &val, MDB_APPENDDUP)) != 0) if ((rc = mdb_cursor_put(cursor, &key, &val, MDB_APPENDDUP)) != 0)
{ {
err = mdb_strerror(rc); err = mdb_strerror(rc);
log_error("Error putting payment: %s", err); log_error("Error putting payment: %s", err);
mdb_txn_abort(txn); continue;
goto cleanup; }
} }
if ((rc = mdb_txn_commit(txn)) != 0) if ((rc = mdb_txn_commit(txn)) != 0)
{ {
@ -1582,6 +1631,7 @@ rpc_on_wallet_transferred(const char* data, rpc_callback_t *callback)
mdb_txn_abort(txn); mdb_txn_abort(txn);
goto cleanup; goto cleanup;
} }
cleanup: cleanup:
json_object_put(root); json_object_put(root);
} }

View file

@ -100,3 +100,12 @@ reverse_bin(char *bin, size_t len)
} }
} }
char *stecpy(char *dst, const char *src, const char *end)
{
while (*src && dst < end)
*dst++ = *src++;
if (dst < end)
*dst = '\0';
return dst;
}

View file

@ -37,5 +37,6 @@ int is_hex_string(const char *str);
void hex_to_bin(const char *hex, char *bin, size_t bin_size); void hex_to_bin(const char *hex, char *bin, size_t bin_size);
void bin_to_hex(const char *bin, size_t bin_size, char *hex); void bin_to_hex(const char *bin, size_t bin_size, char *hex);
void reverse_bin(char *bin, size_t len); void reverse_bin(char *bin, size_t len);
char *stecpy(char *dst, const char *src, const char *end);
#endif #endif