mirror of
				https://git.wownero.com/wownero/wownero.git
				synced 2024-08-15 01:03:23 +00:00 
			
		
		
		
	Publish submitted txs via zmq
This commit is contained in:
		
							parent
							
								
									b6a029f222
								
							
						
					
					
						commit
						8cc3c9af4d
					
				
					 10 changed files with 142 additions and 11 deletions
				
			
		
							
								
								
									
										2
									
								
								.github/workflows/build.yml
									
										
									
									
										vendored
									
									
								
							
							
						
						
									
										2
									
								
								.github/workflows/build.yml
									
										
									
									
										vendored
									
									
								
							| 
						 | 
				
			
			@ -151,7 +151,7 @@ jobs:
 | 
			
		|||
    - name: install monero dependencies
 | 
			
		||||
      run: ${{env.APT_INSTALL_LINUX}}
 | 
			
		||||
    - name: install Python dependencies
 | 
			
		||||
      run: pip install requests psutil monotonic
 | 
			
		||||
      run: pip install requests psutil monotonic zmq
 | 
			
		||||
    - name: tests
 | 
			
		||||
      env:
 | 
			
		||||
        CTEST_OUTPUT_ON_FAILURE: ON
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1406,21 +1406,66 @@ namespace cryptonote
 | 
			
		|||
    return true;
 | 
			
		||||
  }
 | 
			
		||||
  //-----------------------------------------------------------------------------------------------
 | 
			
		||||
  bool core::notify_txpool_event(const epee::span<const cryptonote::blobdata> tx_blobs, epee::span<const crypto::hash> tx_hashes, epee::span<const cryptonote::transaction> txs, const std::vector<bool> &just_broadcasted) const
 | 
			
		||||
  {
 | 
			
		||||
    if (!m_zmq_pub)
 | 
			
		||||
      return true;
 | 
			
		||||
 | 
			
		||||
    if (tx_blobs.size() != tx_hashes.size() || tx_blobs.size() != txs.size() || tx_blobs.size() != just_broadcasted.size())
 | 
			
		||||
      return false;
 | 
			
		||||
 | 
			
		||||
    /* Publish txs via ZMQ that are "just broadcasted" by the daemon. This is
 | 
			
		||||
       done here in addition to `handle_incoming_txs` in order to guarantee txs
 | 
			
		||||
       are pub'd via ZMQ when we know the daemon has/will broadcast to other
 | 
			
		||||
       nodes & *after* the tx is visible in the pool. This should get called
 | 
			
		||||
       when the user submits a tx to a daemon in the "fluff" epoch relaying txs
 | 
			
		||||
       via a public network. */
 | 
			
		||||
    if (std::count(just_broadcasted.begin(), just_broadcasted.end(), true) == 0)
 | 
			
		||||
      return true;
 | 
			
		||||
 | 
			
		||||
    std::vector<txpool_event> results{};
 | 
			
		||||
    results.resize(tx_blobs.size());
 | 
			
		||||
    for (std::size_t i = 0; i < results.size(); ++i)
 | 
			
		||||
    {
 | 
			
		||||
      results[i].tx = std::move(txs[i]);
 | 
			
		||||
      results[i].hash = std::move(tx_hashes[i]);
 | 
			
		||||
      results[i].blob_size = tx_blobs[i].size();
 | 
			
		||||
      results[i].weight = results[i].tx.pruned ? get_pruned_transaction_weight(results[i].tx) : get_transaction_weight(results[i].tx, results[i].blob_size);
 | 
			
		||||
      results[i].res = just_broadcasted[i];
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    m_zmq_pub(std::move(results));
 | 
			
		||||
 | 
			
		||||
    return true;
 | 
			
		||||
  }
 | 
			
		||||
  //-----------------------------------------------------------------------------------------------
 | 
			
		||||
  void core::on_transactions_relayed(const epee::span<const cryptonote::blobdata> tx_blobs, const relay_method tx_relay)
 | 
			
		||||
  {
 | 
			
		||||
    // lock ensures duplicate txs aren't pub'd via zmq
 | 
			
		||||
    CRITICAL_REGION_LOCAL(m_incoming_tx_lock);
 | 
			
		||||
 | 
			
		||||
    std::vector<crypto::hash> tx_hashes{};
 | 
			
		||||
    tx_hashes.resize(tx_blobs.size());
 | 
			
		||||
 | 
			
		||||
    std::vector<cryptonote::transaction> txs{};
 | 
			
		||||
    txs.resize(tx_blobs.size());
 | 
			
		||||
 | 
			
		||||
    for (std::size_t i = 0; i < tx_blobs.size(); ++i)
 | 
			
		||||
    {
 | 
			
		||||
      cryptonote::transaction tx{};
 | 
			
		||||
      if (!parse_and_validate_tx_from_blob(tx_blobs[i], tx, tx_hashes[i]))
 | 
			
		||||
      if (!parse_and_validate_tx_from_blob(tx_blobs[i], txs[i], tx_hashes[i]))
 | 
			
		||||
      {
 | 
			
		||||
        LOG_ERROR("Failed to parse relayed transaction");
 | 
			
		||||
        return;
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
    m_mempool.set_relayed(epee::to_span(tx_hashes), tx_relay);
 | 
			
		||||
 | 
			
		||||
    std::vector<bool> just_broadcasted{};
 | 
			
		||||
    just_broadcasted.reserve(tx_hashes.size());
 | 
			
		||||
 | 
			
		||||
    m_mempool.set_relayed(epee::to_span(tx_hashes), tx_relay, just_broadcasted);
 | 
			
		||||
 | 
			
		||||
    if (m_zmq_pub && matches_category(tx_relay, relay_category::legacy))
 | 
			
		||||
      notify_txpool_event(tx_blobs, epee::to_span(tx_hashes), epee::to_span(txs), just_broadcasted);
 | 
			
		||||
  }
 | 
			
		||||
  //-----------------------------------------------------------------------------------------------
 | 
			
		||||
  bool core::get_block_template(block& b, const account_public_address& adr, difficulty_type& diffic, uint64_t& height, uint64_t& expected_reward, const blobdata& ex_nonce, uint64_t &seed_height, crypto::hash &seed_hash)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1035,6 +1035,13 @@ namespace cryptonote
 | 
			
		|||
      */
 | 
			
		||||
     bool relay_txpool_transactions();
 | 
			
		||||
 | 
			
		||||
     /**
 | 
			
		||||
      * @brief sends notification of txpool events to subscribers
 | 
			
		||||
      *
 | 
			
		||||
      * @return true on success, false otherwise
 | 
			
		||||
      */
 | 
			
		||||
     bool notify_txpool_event(const epee::span<const cryptonote::blobdata> tx_blobs, epee::span<const crypto::hash> tx_hashes, epee::span<const cryptonote::transaction> txs, const std::vector<bool> &just_broadcasted) const;
 | 
			
		||||
 | 
			
		||||
     /**
 | 
			
		||||
      * @brief checks DNS versions
 | 
			
		||||
      *
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -820,8 +820,10 @@ namespace cryptonote
 | 
			
		|||
    return true;
 | 
			
		||||
  }
 | 
			
		||||
  //---------------------------------------------------------------------------------
 | 
			
		||||
  void tx_memory_pool::set_relayed(const epee::span<const crypto::hash> hashes, const relay_method method)
 | 
			
		||||
  void tx_memory_pool::set_relayed(const epee::span<const crypto::hash> hashes, const relay_method method, std::vector<bool> &just_broadcasted)
 | 
			
		||||
  {
 | 
			
		||||
    just_broadcasted.clear();
 | 
			
		||||
 | 
			
		||||
    crypto::random_poisson_seconds embargo_duration{dandelionpp_embargo_average};
 | 
			
		||||
    const auto now = std::chrono::system_clock::now();
 | 
			
		||||
    uint64_t next_relay = uint64_t{std::numeric_limits<time_t>::max()};
 | 
			
		||||
| 
						 | 
				
			
			@ -831,12 +833,14 @@ namespace cryptonote
 | 
			
		|||
    LockedTXN lock(m_blockchain.get_db());
 | 
			
		||||
    for (const auto& hash : hashes)
 | 
			
		||||
    {
 | 
			
		||||
      bool was_just_broadcasted = false;
 | 
			
		||||
      try
 | 
			
		||||
      {
 | 
			
		||||
        txpool_tx_meta_t meta;
 | 
			
		||||
        if (m_blockchain.get_txpool_tx_meta(hash, meta))
 | 
			
		||||
        {
 | 
			
		||||
          // txes can be received as "stem" or "fluff" in either order
 | 
			
		||||
          const bool already_broadcasted = meta.matches(relay_category::broadcasted);
 | 
			
		||||
          meta.upgrade_relay_method(method);
 | 
			
		||||
          meta.relayed = true;
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -849,6 +853,9 @@ namespace cryptonote
 | 
			
		|||
            meta.last_relayed_time = std::chrono::system_clock::to_time_t(now);
 | 
			
		||||
 | 
			
		||||
          m_blockchain.update_txpool_tx(hash, meta);
 | 
			
		||||
 | 
			
		||||
          // wait until db update succeeds to ensure tx is visible in the pool
 | 
			
		||||
          was_just_broadcasted = !already_broadcasted && meta.matches(relay_category::broadcasted);
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
      catch (const std::exception &e)
 | 
			
		||||
| 
						 | 
				
			
			@ -856,6 +863,7 @@ namespace cryptonote
 | 
			
		|||
        MERROR("Failed to update txpool transaction metadata: " << e.what());
 | 
			
		||||
        // continue
 | 
			
		||||
      }
 | 
			
		||||
      just_broadcasted.emplace_back(was_just_broadcasted);
 | 
			
		||||
    }
 | 
			
		||||
    lock.commit();
 | 
			
		||||
    set_if_less(m_next_check, time_t(next_relay));
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -353,8 +353,10 @@ namespace cryptonote
 | 
			
		|||
     *
 | 
			
		||||
     * @param hashes list of tx hashes that are about to be relayed
 | 
			
		||||
     * @param tx_relay update how the tx left this node
 | 
			
		||||
     * @param just_broadcasted true if a tx was just broadcasted
 | 
			
		||||
     *
 | 
			
		||||
     */
 | 
			
		||||
    void set_relayed(epee::span<const crypto::hash> hashes, relay_method tx_relay);
 | 
			
		||||
    void set_relayed(epee::span<const crypto::hash> hashes, relay_method tx_relay, std::vector<bool> &just_broadcasted);
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * @brief get the total number of transactions in the pool
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -54,7 +54,7 @@ Functional tests are located under the `tests/functional_tests` directory.
 | 
			
		|||
 | 
			
		||||
Building all the tests requires installing the following dependencies:
 | 
			
		||||
```bash
 | 
			
		||||
pip install requests psutil monotonic
 | 
			
		||||
pip install requests psutil monotonic zmq
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
First, run a regtest daemon in the offline mode and with a fixed difficulty:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -67,7 +67,7 @@ target_link_libraries(make_test_signature
 | 
			
		|||
monero_add_minimal_executable(cpu_power_test cpu_power_test.cpp)
 | 
			
		||||
find_program(PYTHON3_FOUND python3 REQUIRED)
 | 
			
		||||
 | 
			
		||||
execute_process(COMMAND ${PYTHON3_FOUND} "-c" "import requests; import psutil; import monotonic; print('OK')" OUTPUT_VARIABLE REQUESTS_OUTPUT OUTPUT_STRIP_TRAILING_WHITESPACE)
 | 
			
		||||
execute_process(COMMAND ${PYTHON3_FOUND} "-c" "import requests; import psutil; import monotonic; import zmq; print('OK')" OUTPUT_VARIABLE REQUESTS_OUTPUT OUTPUT_STRIP_TRAILING_WHITESPACE)
 | 
			
		||||
if (REQUESTS_OUTPUT STREQUAL "OK")
 | 
			
		||||
  add_test(
 | 
			
		||||
    NAME    functional_tests_rpc
 | 
			
		||||
| 
						 | 
				
			
			@ -76,6 +76,6 @@ if (REQUESTS_OUTPUT STREQUAL "OK")
 | 
			
		|||
    NAME    check_missing_rpc_methods
 | 
			
		||||
    COMMAND ${PYTHON3_FOUND} "${CMAKE_CURRENT_SOURCE_DIR}/check_missing_rpc_methods.py" "${CMAKE_SOURCE_DIR}")
 | 
			
		||||
else()
 | 
			
		||||
  message(WARNING "functional_tests_rpc and check_missing_rpc_methods skipped, needs the 'requests', 'psutil' and 'monotonic' python modules")
 | 
			
		||||
  message(WARNING "functional_tests_rpc and check_missing_rpc_methods skipped, needs the 'requests', 'psutil', 'monotonic', and 'zmq' python modules")
 | 
			
		||||
  set(CTEST_CUSTOM_TESTS_IGNORE ${CTEST_CUSTOM_TESTS_IGNORE} functional_tests_rpc check_missing_rpc_methods)
 | 
			
		||||
endif()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -47,7 +47,7 @@ WALLET_DIRECTORY = builddir + "/functional-tests-directory"
 | 
			
		|||
FUNCTIONAL_TESTS_DIRECTORY = builddir + "/tests/functional_tests"
 | 
			
		||||
DIFFICULTY = 10
 | 
			
		||||
 | 
			
		||||
monerod_base = [builddir + "/bin/monerod", "--regtest", "--fixed-difficulty", str(DIFFICULTY), "--no-igd", "--p2p-bind-port", "monerod_p2p_port", "--rpc-bind-port", "monerod_rpc_port", "--zmq-rpc-bind-port", "monerod_zmq_port", "--non-interactive", "--disable-dns-checkpoints", "--check-updates", "disabled", "--rpc-ssl", "disabled", "--data-dir", "monerod_data_dir", "--log-level", "1"]
 | 
			
		||||
monerod_base = [builddir + "/bin/monerod", "--regtest", "--fixed-difficulty", str(DIFFICULTY), "--no-igd", "--p2p-bind-port", "monerod_p2p_port", "--rpc-bind-port", "monerod_rpc_port", "--zmq-rpc-bind-port", "monerod_zmq_port", "--zmq-pub", "monerod_zmq_pub", "--non-interactive", "--disable-dns-checkpoints", "--check-updates", "disabled", "--rpc-ssl", "disabled", "--data-dir", "monerod_data_dir", "--log-level", "1"]
 | 
			
		||||
monerod_extra = [
 | 
			
		||||
  ["--offline"],
 | 
			
		||||
  ["--rpc-payment-address", "44SKxxLQw929wRF6BA9paQ1EWFshNnKhXM3qz6Mo3JGDE2YG3xyzVutMStEicxbQGRfrYvAAYxH6Fe8rnD56EaNwUiqhcwR", "--rpc-payment-difficulty", str(DIFFICULTY), "--rpc-payment-credits", "5000", "--offline"],
 | 
			
		||||
| 
						 | 
				
			
			@ -69,7 +69,7 @@ outputs = []
 | 
			
		|||
ports = []
 | 
			
		||||
 | 
			
		||||
for i in range(N_MONERODS):
 | 
			
		||||
  command_lines.append([str(18180+i) if x == "monerod_rpc_port" else str(18280+i) if x == "monerod_p2p_port" else str(18380+i) if x == "monerod_zmq_port" else builddir + "/functional-tests-directory/monerod" + str(i) if x == "monerod_data_dir" else x for x in monerod_base])
 | 
			
		||||
  command_lines.append([str(18180+i) if x == "monerod_rpc_port" else str(18280+i) if x == "monerod_p2p_port" else str(18380+i) if x == "monerod_zmq_port" else "tcp://127.0.0.1:" + str(18480+i) if x == "monerod_zmq_pub" else builddir + "/functional-tests-directory/monerod" + str(i) if x == "monerod_data_dir" else x for x in monerod_base])
 | 
			
		||||
  if i < len(monerod_extra):
 | 
			
		||||
    command_lines[-1] += monerod_extra[i]
 | 
			
		||||
  outputs.append(open(FUNCTIONAL_TESTS_DIRECTORY + '/monerod' + str(i) + '.log', 'a+'))
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -35,6 +35,7 @@ from __future__ import print_function
 | 
			
		|||
 | 
			
		||||
from framework.daemon import Daemon
 | 
			
		||||
from framework.wallet import Wallet
 | 
			
		||||
from framework.zmq import Zmq
 | 
			
		||||
 | 
			
		||||
class TransferTest():
 | 
			
		||||
    def run_test(self):
 | 
			
		||||
| 
						 | 
				
			
			@ -105,6 +106,10 @@ class TransferTest():
 | 
			
		|||
    def check_txpool(self):
 | 
			
		||||
        daemon = Daemon()
 | 
			
		||||
        wallet = Wallet()
 | 
			
		||||
        zmq = Zmq()
 | 
			
		||||
 | 
			
		||||
        zmq_topic = "json-minimal-txpool_add"
 | 
			
		||||
        zmq.sub(zmq_topic)
 | 
			
		||||
 | 
			
		||||
        res = daemon.get_info()
 | 
			
		||||
        height = res.height
 | 
			
		||||
| 
						 | 
				
			
			@ -142,6 +147,21 @@ class TransferTest():
 | 
			
		|||
            min_bytes = min(min_bytes, x.blob_size)
 | 
			
		||||
            max_bytes = max(max_bytes, x.blob_size)
 | 
			
		||||
 | 
			
		||||
        print('Checking all txs received via zmq')
 | 
			
		||||
        for i in range(len(txes.keys())):
 | 
			
		||||
            zmq_event = zmq.recv(zmq_topic)
 | 
			
		||||
            assert len(zmq_event) == 1
 | 
			
		||||
 | 
			
		||||
            zmq_tx = zmq_event[0]
 | 
			
		||||
 | 
			
		||||
            x = [x for x in res.transactions if x.id_hash == zmq_tx["id"]]
 | 
			
		||||
            assert len(x) == 1
 | 
			
		||||
 | 
			
		||||
            x = x[0]
 | 
			
		||||
            assert x.blob_size == zmq_tx["blob_size"]
 | 
			
		||||
            assert x.weight == zmq_tx["weight"]
 | 
			
		||||
            assert x.fee == zmq_tx["fee"]
 | 
			
		||||
 | 
			
		||||
        res = daemon.get_transaction_pool_hashes()
 | 
			
		||||
        assert sorted(res.tx_hashes) == sorted(txes.keys())
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										49
									
								
								utils/python-rpc/framework/zmq.py
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										49
									
								
								utils/python-rpc/framework/zmq.py
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,49 @@
 | 
			
		|||
# Copyright (c) 2018-2022, The Monero Project
 | 
			
		||||
 | 
			
		||||
# 
 | 
			
		||||
# All rights reserved.
 | 
			
		||||
# 
 | 
			
		||||
# Redistribution and use in source and binary forms, with or without modification, are
 | 
			
		||||
# permitted provided that the following conditions are met:
 | 
			
		||||
# 
 | 
			
		||||
# 1. Redistributions of source code must retain the above copyright notice, this list of
 | 
			
		||||
#    conditions and the following disclaimer.
 | 
			
		||||
# 
 | 
			
		||||
# 2. Redistributions in binary form must reproduce the above copyright notice, this list
 | 
			
		||||
#    of conditions and the following disclaimer in the documentation and/or other
 | 
			
		||||
#    materials provided with the distribution.
 | 
			
		||||
# 
 | 
			
		||||
# 3. Neither the name of the copyright holder nor the names of its contributors may be
 | 
			
		||||
#    used to endorse or promote products derived from this software without specific
 | 
			
		||||
#    prior written permission.
 | 
			
		||||
# 
 | 
			
		||||
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
 | 
			
		||||
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
 | 
			
		||||
# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
 | 
			
		||||
# THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 | 
			
		||||
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 | 
			
		||||
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 | 
			
		||||
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
 | 
			
		||||
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
 | 
			
		||||
# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 | 
			
		||||
 | 
			
		||||
"""Class to subscribe to and receive ZMQ events."""
 | 
			
		||||
 | 
			
		||||
import zmq
 | 
			
		||||
import json
 | 
			
		||||
 | 
			
		||||
class Zmq(object):
 | 
			
		||||
 | 
			
		||||
    def __init__(self, protocol='tcp', host='127.0.0.1', port=0, idx=0):
 | 
			
		||||
        self.host = host
 | 
			
		||||
        self.port = port
 | 
			
		||||
        self.socket = zmq.Context().socket(zmq.SUB)
 | 
			
		||||
        self.socket.connect('{protocol}://{host}:{port}'.format(protocol=protocol, host=host, port=port if port else 18480+idx))
 | 
			
		||||
 | 
			
		||||
    def sub(self, topic):
 | 
			
		||||
        self.socket.setsockopt_string(zmq.SUBSCRIBE, topic)
 | 
			
		||||
 | 
			
		||||
    def recv(self, topic):
 | 
			
		||||
        msg = self.socket.recv()
 | 
			
		||||
        data = msg.decode().split(topic + ":")[1]
 | 
			
		||||
        return json.loads(data)
 | 
			
		||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue