Merge pull request #7228 from vtnerd/fix/p2p_memory_usage_017

Add aggressive restrictions to pre-handshake p2p buffer limit
This commit is contained in:
binaryFate 2020-12-31 21:35:00 +01:00 committed by GitHub
commit 00b3502749
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 40 additions and 11 deletions

View file

@ -72,7 +72,8 @@ namespace levin
#define LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED 0 #define LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED 0
#define LEVIN_DEFAULT_MAX_PACKET_SIZE 100000000 //100MB by default #define LEVIN_INITIAL_MAX_PACKET_SIZE 256*1024 // 256 KiB before handshake
#define LEVIN_DEFAULT_MAX_PACKET_SIZE 100000000 //100MB by default after handshake
#define LEVIN_PACKET_REQUEST 0x00000001 #define LEVIN_PACKET_REQUEST 0x00000001
#define LEVIN_PACKET_RESPONSE 0x00000002 #define LEVIN_PACKET_RESPONSE 0x00000002

View file

@ -84,6 +84,7 @@ class async_protocol_handler_config
public: public:
typedef t_connection_context connection_context; typedef t_connection_context connection_context;
uint64_t m_initial_max_packet_size;
uint64_t m_max_packet_size; uint64_t m_max_packet_size;
uint64_t m_invoke_timeout; uint64_t m_invoke_timeout;
@ -105,7 +106,7 @@ public:
size_t get_in_connections_count(); size_t get_in_connections_count();
void set_handler(levin_commands_handler<t_connection_context>* handler, void (*destroy)(levin_commands_handler<t_connection_context>*) = NULL); void set_handler(levin_commands_handler<t_connection_context>* handler, void (*destroy)(levin_commands_handler<t_connection_context>*) = NULL);
async_protocol_handler_config():m_pcommands_handler(NULL), m_pcommands_handler_destroy(NULL), m_max_packet_size(LEVIN_DEFAULT_MAX_PACKET_SIZE), m_invoke_timeout(LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED) async_protocol_handler_config():m_pcommands_handler(NULL), m_pcommands_handler_destroy(NULL), m_initial_max_packet_size(LEVIN_INITIAL_MAX_PACKET_SIZE), m_max_packet_size(LEVIN_DEFAULT_MAX_PACKET_SIZE), m_invoke_timeout(LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
{} {}
~async_protocol_handler_config() { set_handler(NULL, NULL); } ~async_protocol_handler_config() { set_handler(NULL, NULL); }
void del_out_connections(size_t count); void del_out_connections(size_t count);
@ -162,6 +163,7 @@ public:
net_utils::i_service_endpoint* m_pservice_endpoint; net_utils::i_service_endpoint* m_pservice_endpoint;
config_type& m_config; config_type& m_config;
t_connection_context& m_connection_context; t_connection_context& m_connection_context;
std::atomic<uint64_t> m_max_packet_size;
net_utils::buffer m_cache_in_buffer; net_utils::buffer m_cache_in_buffer;
stream_state m_state; stream_state m_state;
@ -290,6 +292,7 @@ public:
m_pservice_endpoint(psnd_hndlr), m_pservice_endpoint(psnd_hndlr),
m_config(config), m_config(config),
m_connection_context(conn_context), m_connection_context(conn_context),
m_max_packet_size(config.m_initial_max_packet_size),
m_cache_in_buffer(4 * 1024), m_cache_in_buffer(4 * 1024),
m_state(stream_state_head) m_state(stream_state_head)
{ {
@ -399,13 +402,14 @@ public:
} }
// these should never fail, but do runtime check for safety // these should never fail, but do runtime check for safety
CHECK_AND_ASSERT_MES(m_config.m_max_packet_size >= m_cache_in_buffer.size(), false, "Bad m_cache_in_buffer.size()"); const uint64_t max_packet_size = m_max_packet_size;
CHECK_AND_ASSERT_MES(m_config.m_max_packet_size - m_cache_in_buffer.size() >= m_fragment_buffer.size(), false, "Bad m_cache_in_buffer.size() + m_fragment_buffer.size()"); CHECK_AND_ASSERT_MES(max_packet_size >= m_cache_in_buffer.size(), false, "Bad m_cache_in_buffer.size()");
CHECK_AND_ASSERT_MES(max_packet_size - m_cache_in_buffer.size() >= m_fragment_buffer.size(), false, "Bad m_cache_in_buffer.size() + m_fragment_buffer.size()");
// flipped to subtraction; prevent overflow since m_max_packet_size is variable and public // flipped to subtraction; prevent overflow since m_max_packet_size is variable and public
if(cb > m_config.m_max_packet_size - m_cache_in_buffer.size() - m_fragment_buffer.size()) if(cb > max_packet_size - m_cache_in_buffer.size() - m_fragment_buffer.size())
{ {
MWARNING(m_connection_context << "Maximum packet size exceed!, m_max_packet_size = " << m_config.m_max_packet_size MWARNING(m_connection_context << "Maximum packet size exceed!, m_max_packet_size = " << max_packet_size
<< ", packet received " << m_cache_in_buffer.size() + cb << ", packet received " << m_cache_in_buffer.size() + cb
<< ", connection will be closed."); << ", connection will be closed.");
return false; return false;
@ -519,6 +523,10 @@ public:
m_current_head.m_command, buff_to_invoke, return_buff, m_connection_context m_current_head.m_command, buff_to_invoke, return_buff, m_connection_context
); );
// peer_id remains unset if dropped
if (m_current_head.m_command == m_connection_context.handshake_command() && m_connection_context.handshake_complete())
m_max_packet_size = m_config.m_max_packet_size;
bucket_head2 head = make_header(m_current_head.m_command, return_buff.size(), LEVIN_PACKET_RESPONSE, false); bucket_head2 head = make_header(m_current_head.m_command, return_buff.size(), LEVIN_PACKET_RESPONSE, false);
head.m_return_code = SWAP32LE(return_code); head.m_return_code = SWAP32LE(return_code);
return_buff.insert(0, reinterpret_cast<const char*>(&head), sizeof(head)); return_buff.insert(0, reinterpret_cast<const char*>(&head), sizeof(head));
@ -577,9 +585,9 @@ public:
m_cache_in_buffer.erase(sizeof(bucket_head2)); m_cache_in_buffer.erase(sizeof(bucket_head2));
m_state = stream_state_body; m_state = stream_state_body;
m_oponent_protocol_ver = m_current_head.m_protocol_version; m_oponent_protocol_ver = m_current_head.m_protocol_version;
if(m_current_head.m_cb > m_config.m_max_packet_size) if(m_current_head.m_cb > max_packet_size)
{ {
LOG_ERROR_CC(m_connection_context, "Maximum packet size exceed!, m_max_packet_size = " << m_config.m_max_packet_size LOG_ERROR_CC(m_connection_context, "Maximum packet size exceed!, m_max_packet_size = " << max_packet_size
<< ", packet header received " << m_current_head.m_cb << ", packet header received " << m_current_head.m_cb
<< ", connection will be closed."); << ", connection will be closed.");
return false; return false;
@ -634,6 +642,9 @@ public:
boost::interprocess::ipcdetail::atomic_write32(&m_invoke_buf_ready, 0); boost::interprocess::ipcdetail::atomic_write32(&m_invoke_buf_ready, 0);
CRITICAL_REGION_BEGIN(m_invoke_response_handlers_lock); CRITICAL_REGION_BEGIN(m_invoke_response_handlers_lock);
if (command == m_connection_context.handshake_command())
m_max_packet_size = m_config.m_max_packet_size;
if(!send_message(command, in_buff, LEVIN_PACKET_REQUEST, true)) if(!send_message(command, in_buff, LEVIN_PACKET_REQUEST, true))
{ {
LOG_ERROR_CC(m_connection_context, "Failed to do_send"); LOG_ERROR_CC(m_connection_context, "Failed to do_send");
@ -675,6 +686,9 @@ public:
boost::interprocess::ipcdetail::atomic_write32(&m_invoke_buf_ready, 0); boost::interprocess::ipcdetail::atomic_write32(&m_invoke_buf_ready, 0);
if (command == m_connection_context.handshake_command())
m_max_packet_size = m_config.m_max_packet_size;
if (!send_message(command, in_buff, LEVIN_PACKET_REQUEST, true)) if (!send_message(command, in_buff, LEVIN_PACKET_REQUEST, true))
{ {
LOG_ERROR_CC(m_connection_context, "Failed to send request"); LOG_ERROR_CC(m_connection_context, "Failed to send request");

View file

@ -55,6 +55,8 @@ namespace cryptonote
state_normal state_normal
}; };
bool handshake_complete() const noexcept { return m_state != state_before_handshake; }
state m_state; state m_state;
std::vector<std::pair<crypto::hash, uint64_t>> m_needed_objects; std::vector<std::pair<crypto::hash, uint64_t>> m_needed_objects;
std::unordered_set<crypto::hash> m_requested_objects; std::unordered_set<crypto::hash> m_requested_objects;

View file

@ -2634,6 +2634,7 @@ skip:
std::vector<std::pair<epee::net_utils::zone, boost::uuids::uuid>> fullConnections, fluffyConnections; std::vector<std::pair<epee::net_utils::zone, boost::uuids::uuid>> fullConnections, fluffyConnections;
m_p2p->for_each_connection([this, &exclude_context, &fullConnections, &fluffyConnections](connection_context& context, nodetool::peerid_type peer_id, uint32_t support_flags) m_p2p->for_each_connection([this, &exclude_context, &fullConnections, &fluffyConnections](connection_context& context, nodetool::peerid_type peer_id, uint32_t support_flags)
{ {
// peer_id also filters out connections before handshake
if (peer_id && exclude_context.m_connection_id != context.m_connection_id && context.m_remote_address.get_zone() == epee::net_utils::zone::public_) if (peer_id && exclude_context.m_connection_id != context.m_connection_id && context.m_remote_address.get_zone() == epee::net_utils::zone::public_)
{ {
if(m_core.fluffy_blocks_enabled() && (support_flags & P2P_SUPPORT_FLAG_FLUFFY_BLOCKS)) if(m_core.fluffy_blocks_enabled() && (support_flags & P2P_SUPPORT_FLAG_FLUFFY_BLOCKS))

View file

@ -442,7 +442,7 @@ namespace levin
zone->p2p->foreach_connection([txs, now, &zone, &source, &in_duration, &out_duration, &next_flush] (detail::p2p_context& context) zone->p2p->foreach_connection([txs, now, &zone, &source, &in_duration, &out_duration, &next_flush] (detail::p2p_context& context)
{ {
// When i2p/tor, only fluff to outbound connections // When i2p/tor, only fluff to outbound connections
if (source != context.m_connection_id && (zone->nzone == epee::net_utils::zone::public_ || !context.m_is_income)) if (context.handshake_complete() && source != context.m_connection_id && (zone->nzone == epee::net_utils::zone::public_ || !context.m_is_income))
{ {
if (context.fluff_txs.empty()) if (context.fluff_txs.empty())
context.flush_time = now + (context.m_is_income ? in_duration() : out_duration()); context.flush_time = now + (context.m_is_income ? in_duration() : out_duration());

View file

@ -118,6 +118,8 @@ namespace nodetool
m_in_timedsync(false) m_in_timedsync(false)
{} {}
static constexpr int handshake_command() noexcept { return 1001; }
std::vector<cryptonote::blobdata> fluff_txs; std::vector<cryptonote::blobdata> fluff_txs;
std::chrono::steady_clock::time_point flush_time; std::chrono::steady_clock::time_point flush_time;
peerid_type peer_id; peerid_type peer_id;
@ -139,6 +141,7 @@ namespace nodetool
typedef COMMAND_HANDSHAKE_T<typename t_payload_net_handler::payload_type> COMMAND_HANDSHAKE; typedef COMMAND_HANDSHAKE_T<typename t_payload_net_handler::payload_type> COMMAND_HANDSHAKE;
typedef COMMAND_TIMED_SYNC_T<typename t_payload_net_handler::payload_type> COMMAND_TIMED_SYNC; typedef COMMAND_TIMED_SYNC_T<typename t_payload_net_handler::payload_type> COMMAND_TIMED_SYNC;
static_assert(p2p_connection_context::handshake_command() == COMMAND_HANDSHAKE::ID, "invalid handshake command id");
typedef epee::net_utils::boosted_tcp_server<epee::levin::async_protocol_handler<p2p_connection_context>> net_server; typedef epee::net_utils::boosted_tcp_server<epee::levin::async_protocol_handler<p2p_connection_context>> net_server;

View file

@ -52,6 +52,8 @@ namespace
struct test_levin_connection_context : public epee::net_utils::connection_context_base struct test_levin_connection_context : public epee::net_utils::connection_context_base
{ {
static constexpr int handshake_command() noexcept { return 1001; }
static constexpr bool handshake_complete() noexcept { return true; }
}; };
typedef epee::levin::async_protocol_handler_config<test_levin_connection_context> test_levin_protocol_handler_config; typedef epee::levin::async_protocol_handler_config<test_levin_connection_context> test_levin_protocol_handler_config;

View file

@ -48,6 +48,8 @@ namespace net_load_tests
struct test_connection_context : epee::net_utils::connection_context_base struct test_connection_context : epee::net_utils::connection_context_base
{ {
test_connection_context(): epee::net_utils::connection_context_base(boost::uuids::nil_uuid(), {}, false, false), m_closed(false) {} test_connection_context(): epee::net_utils::connection_context_base(boost::uuids::nil_uuid(), {}, false, false), m_closed(false) {}
static constexpr int handshake_command() noexcept { return 1001; }
static constexpr bool handshake_complete() noexcept { return true; }
volatile bool m_closed; volatile bool m_closed;
}; };

View file

@ -43,6 +43,8 @@ namespace
{ {
struct test_levin_connection_context : public epee::net_utils::connection_context_base struct test_levin_connection_context : public epee::net_utils::connection_context_base
{ {
static constexpr int handshake_command() noexcept { return 1001; }
static constexpr bool handshake_complete() noexcept { return true; }
}; };
typedef epee::levin::async_protocol_handler_config<test_levin_connection_context> test_levin_protocol_handler_config; typedef epee::levin::async_protocol_handler_config<test_levin_connection_context> test_levin_protocol_handler_config;
@ -194,6 +196,7 @@ namespace
{ {
m_handler_config.set_handler(m_pcommands_handler, [](epee::levin::levin_commands_handler<test_levin_connection_context> *handler) { delete handler; }); m_handler_config.set_handler(m_pcommands_handler, [](epee::levin::levin_commands_handler<test_levin_connection_context> *handler) { delete handler; });
m_handler_config.m_invoke_timeout = invoke_timeout; m_handler_config.m_invoke_timeout = invoke_timeout;
m_handler_config.m_initial_max_packet_size = max_packet_size;
m_handler_config.m_max_packet_size = max_packet_size; m_handler_config.m_max_packet_size = max_packet_size;
} }

View file

@ -178,6 +178,7 @@ namespace
{ {
using base_type = epee::net_utils::connection_context_base; using base_type = epee::net_utils::connection_context_base;
static_cast<base_type&>(context_) = base_type{random_generator(), {}, is_incoming, false}; static_cast<base_type&>(context_) = base_type{random_generator(), {}, is_incoming, false};
context_.m_state = cryptonote::cryptonote_connection_context::state_normal;
handler_.after_init_connection(); handler_.after_init_connection();
} }