diff --git a/contrib/epee/include/storages/levin_abstract_invoke2.h b/contrib/epee/include/storages/levin_abstract_invoke2.h index 06eb9bdaf..b18e04a27 100644 --- a/contrib/epee/include/storages/levin_abstract_invoke2.h +++ b/contrib/epee/include/storages/levin_abstract_invoke2.h @@ -34,10 +34,28 @@ #undef MONERO_DEFAULT_LOG_CATEGORY #define MONERO_DEFAULT_LOG_CATEGORY "net" +namespace +{ + template + void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, const char *category) + { + MCINFO("net.p2p.traffic", context << bytes << " bytes " << (sent ? "sent" : "received") << (error ? "/corrupt" : "") + << " for category " << category << " initiated by " << (initiator ? "us" : "peer")); + } + template + void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, int command) + { + char buf[32]; + snprintf(buf, sizeof(buf), "command-%u", command); + return on_levin_traffic(context, initiator, sent, error, bytes, buf); + } +} + namespace epee { namespace net_utils { +#if 0 template bool invoke_remote_command2(int command, const t_arg& out_struct, t_result& result_struct, t_transport& transport) { @@ -83,16 +101,18 @@ namespace epee } return true; } +#endif template - bool invoke_remote_command2(boost::uuids::uuid conn_id, int command, const t_arg& out_struct, t_result& result_struct, t_transport& transport) + bool invoke_remote_command2(const epee::net_utils::connection_context_base context, int command, const t_arg& out_struct, t_result& result_struct, t_transport& transport) { - + const boost::uuids::uuid &conn_id = context.m_connection_id; typename serialization::portable_storage stg; out_struct.store(stg); std::string buff_to_send, buff_to_recv; stg.store_to_binary(buff_to_send); + on_levin_traffic(context, true, true, false, buff_to_send.size(), command); int res = transport.invoke(command, buff_to_send, buff_to_recv, conn_id); if( res <=0 ) { @@ -102,24 +122,30 @@ namespace epee typename serialization::portable_storage stg_ret; if(!stg_ret.load_from_binary(buff_to_recv)) { + on_levin_traffic(context, true, false, true, buff_to_recv.size(), command); LOG_ERROR("Failed to load_from_binary on command " << command); return false; } + on_levin_traffic(context, true, false, false, buff_to_recv.size(), command); return result_struct.load(stg_ret); } template - bool async_invoke_remote_command2(boost::uuids::uuid conn_id, int command, const t_arg& out_struct, t_transport& transport, const callback_t &cb, size_t inv_timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED) + bool async_invoke_remote_command2(const epee::net_utils::connection_context_base &context, int command, const t_arg& out_struct, t_transport& transport, const callback_t &cb, size_t inv_timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED) { + const boost::uuids::uuid &conn_id = context.m_connection_id; typename serialization::portable_storage stg; const_cast(out_struct).store(stg);//TODO: add true const support to searilzation std::string buff_to_send; stg.store_to_binary(buff_to_send); + on_levin_traffic(context, true, true, false, buff_to_send.size(), command); int res = transport.invoke_async(command, epee::strspan(buff_to_send), conn_id, [cb, command](int code, const epee::span buff, typename t_transport::connection_context& context)->bool { t_result result_struct = AUTO_VAL_INIT(result_struct); if( code <=0 ) { + if (!buff.empty()) + on_levin_traffic(context, true, false, true, buff.size(), command); LOG_PRINT_L1("Failed to invoke command " << command << " return code " << code); cb(code, result_struct, context); return false; @@ -127,16 +153,19 @@ namespace epee serialization::portable_storage stg_ret; if(!stg_ret.load_from_binary(buff)) { + on_levin_traffic(context, true, false, true, buff.size(), command); LOG_ERROR("Failed to load_from_binary on command " << command); cb(LEVIN_ERROR_FORMAT, result_struct, context); return false; } if (!result_struct.load(stg_ret)) { + on_levin_traffic(context, true, false, true, buff.size(), command); LOG_ERROR("Failed to load result struct on command " << command); cb(LEVIN_ERROR_FORMAT, result_struct, context); return false; } + on_levin_traffic(context, true, false, false, buff.size(), command); cb(code, result_struct, context); return true; }, inv_timeout); @@ -149,14 +178,15 @@ namespace epee } template - bool notify_remote_command2(boost::uuids::uuid conn_id, int command, const t_arg& out_struct, t_transport& transport) + bool notify_remote_command2(const typename t_transport::connection_context &context, int command, const t_arg& out_struct, t_transport& transport) { - + const boost::uuids::uuid &conn_id = context.m_connection_id; serialization::portable_storage stg; out_struct.store(stg); std::string buff_to_send; stg.store_to_binary(buff_to_send); + on_levin_traffic(context, true, true, false, buff_to_send.size(), command); int res = transport.notify(command, epee::strspan(buff_to_send), conn_id); if(res <=0 ) { @@ -173,6 +203,7 @@ namespace epee serialization::portable_storage strg; if(!strg.load_from_binary(in_buff)) { + on_levin_traffic(context, false, false, true, in_buff.size(), command); LOG_ERROR("Failed to load_from_binary in command " << command); return -1; } @@ -181,9 +212,11 @@ namespace epee if (!static_cast(in_struct).load(strg)) { + on_levin_traffic(context, false, false, true, in_buff.size(), command); LOG_ERROR("Failed to load in_struct in command " << command); return -1; } + on_levin_traffic(context, false, false, false, in_buff.size(), command); int res = cb(command, static_cast(in_struct), static_cast(out_struct), context); serialization::portable_storage strg_out; static_cast(out_struct).store(strg_out); @@ -193,6 +226,7 @@ namespace epee LOG_ERROR("Failed to store_to_binary in command" << command); return -1; } + on_levin_traffic(context, false, true, false, buff_out.size(), command); return res; } @@ -203,15 +237,18 @@ namespace epee serialization::portable_storage strg; if(!strg.load_from_binary(in_buff)) { + on_levin_traffic(context, false, false, true, in_buff.size(), command); LOG_ERROR("Failed to load_from_binary in notify " << command); return -1; } boost::value_initialized in_struct; if (!static_cast(in_struct).load(strg)) { + on_levin_traffic(context, false, false, true, in_buff.size(), command); LOG_ERROR("Failed to load in_struct in notify " << command); return -1; } + on_levin_traffic(context, false, false, false, in_buff.size(), command); return cb(command, in_struct, context); } @@ -296,6 +333,7 @@ namespace epee #define END_INVOKE_MAP2() \ LOG_ERROR("Unknown command:" << command); \ + on_levin_traffic(context, false, false, true, in_buff.size(), "invalid-command"); \ return LEVIN_ERROR_CONNECTION_HANDLER_NOT_DEFINED; \ } } diff --git a/src/cryptonote_protocol/levin_notify.cpp b/src/cryptonote_protocol/levin_notify.cpp index a0a4bbbb1..e45c34e02 100644 --- a/src/cryptonote_protocol/levin_notify.cpp +++ b/src/cryptonote_protocol/levin_notify.cpp @@ -44,6 +44,14 @@ #include "net/dandelionpp.h" #include "p2p/net_node.h" +namespace +{ + int get_command_from_message(const cryptonote::blobdata &msg) + { + return msg.size() >= sizeof(epee::levin::bucket_head2) ? SWAP32LE(((epee::levin::bucket_head2*)msg.data())->m_command) : 0; + } +} + namespace cryptonote { namespace levin @@ -164,6 +172,10 @@ namespace levin bool make_payload_send_txs(connections& p2p, std::vector&& txs, const boost::uuids::uuid& destination, const bool pad) { const cryptonote::blobdata blob = make_tx_payload(std::move(txs), pad); + p2p.for_connection(destination, [&blob](detail::p2p_context& context) { + on_levin_traffic(context, true, true, false, blob.size(), get_command_from_message(blob)); + return true; + }); return p2p.notify(NOTIFY_NEW_TRANSACTIONS::ID, epee::strspan(blob), destination); } @@ -539,6 +551,10 @@ namespace levin else message = zone_->noise.clone(); + zone_->p2p->for_connection(channel.connection, [&](detail::p2p_context& context) { + on_levin_traffic(context, true, true, false, message.size(), "noise"); + return true; + }); if (zone_->p2p->send(std::move(message), channel.connection)) { if (!channel.queue.empty() && channel.active.empty()) diff --git a/src/p2p/net_node.inl b/src/p2p/net_node.inl index 263cecfa2..89c8602f3 100644 --- a/src/p2p/net_node.inl +++ b/src/p2p/net_node.inl @@ -1024,7 +1024,7 @@ namespace nodetool epee::simple_event ev; std::atomic hsh_result(false); - bool r = epee::net_utils::async_invoke_remote_command2(context_.m_connection_id, COMMAND_HANDSHAKE::ID, arg, zone.m_net_server.get_config_object(), + bool r = epee::net_utils::async_invoke_remote_command2(context_, COMMAND_HANDSHAKE::ID, arg, zone.m_net_server.get_config_object(), [this, &pi, &ev, &hsh_result, &just_take_peerlist, &context_](int code, const typename COMMAND_HANDSHAKE::response& rsp, p2p_connection_context& context) { epee::misc_utils::auto_scope_leave_caller scope_exit_handler = epee::misc_utils::create_scope_leave_handler([&](){ev.raise();}); @@ -1109,7 +1109,7 @@ namespace nodetool m_payload_handler.get_payload_sync_data(arg.payload_data); network_zone& zone = m_network_zones.at(context_.m_remote_address.get_zone()); - bool r = epee::net_utils::async_invoke_remote_command2(context_.m_connection_id, COMMAND_TIMED_SYNC::ID, arg, zone.m_net_server.get_config_object(), + bool r = epee::net_utils::async_invoke_remote_command2(context_, COMMAND_TIMED_SYNC::ID, arg, zone.m_net_server.get_config_object(), [this](int code, const typename COMMAND_TIMED_SYNC::response& rsp, p2p_connection_context& context) { context.m_in_timedsync = false; @@ -2216,7 +2216,7 @@ namespace nodetool network_zone& zone = m_network_zones.at(address.get_zone()); - bool inv_call_res = epee::net_utils::async_invoke_remote_command2(ping_context.m_connection_id, COMMAND_PING::ID, req, zone.m_net_server.get_config_object(), + bool inv_call_res = epee::net_utils::async_invoke_remote_command2(ping_context, COMMAND_PING::ID, req, zone.m_net_server.get_config_object(), [=](int code, const COMMAND_PING::response& rsp, p2p_connection_context& context) { if(code <= 0) @@ -2260,7 +2260,7 @@ namespace nodetool COMMAND_REQUEST_SUPPORT_FLAGS::request support_flags_request; bool r = epee::net_utils::async_invoke_remote_command2 ( - context.m_connection_id, + context, COMMAND_REQUEST_SUPPORT_FLAGS::ID, support_flags_request, m_network_zones.at(epee::net_utils::zone::public_).m_net_server.get_config_object(), diff --git a/tests/net_load_tests/clt.cpp b/tests/net_load_tests/clt.cpp index fc2280f23..e154363e7 100644 --- a/tests/net_load_tests/clt.cpp +++ b/tests/net_load_tests/clt.cpp @@ -202,11 +202,11 @@ namespace // Connect to server std::atomic conn_status(0); - m_cmd_conn_id = boost::uuids::nil_uuid(); + m_context = {}; ASSERT_TRUE(m_tcp_server.connect_async("127.0.0.1", srv_port, CONNECTION_TIMEOUT, [&](const test_connection_context& context, const boost::system::error_code& ec) { if (!ec) { - m_cmd_conn_id = context.m_connection_id; + m_context = context; } else { @@ -217,11 +217,11 @@ namespace EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != conn_status.load(std::memory_order_seq_cst); })) << "connect_async timed out"; ASSERT_EQ(1, conn_status.load(std::memory_order_seq_cst)); - ASSERT_FALSE(m_cmd_conn_id.is_nil()); + ASSERT_FALSE(m_context.m_connection_id.is_nil()); conn_status.store(0, std::memory_order_seq_cst); CMD_RESET_STATISTICS::request req; - ASSERT_TRUE(epee::net_utils::async_invoke_remote_command2(m_cmd_conn_id, CMD_RESET_STATISTICS::ID, req, + ASSERT_TRUE(epee::net_utils::async_invoke_remote_command2(m_context, CMD_RESET_STATISTICS::ID, req, m_tcp_server.get_config_object(), [&](int code, const CMD_RESET_STATISTICS::response& rsp, const test_connection_context&) { conn_status.store(code, std::memory_order_seq_cst); })); @@ -250,16 +250,16 @@ namespace // Connect to server and invoke shutdown command std::atomic conn_status(0); - boost::uuids::uuid cmd_conn_id = boost::uuids::nil_uuid(); + test_connection_context cmd_context; tcp_server.connect_async("127.0.0.1", srv_port, CONNECTION_TIMEOUT, [&](const test_connection_context& context, const boost::system::error_code& ec) { - cmd_conn_id = context.m_connection_id; + cmd_context = context; conn_status.store(!ec ? 1 : -1, std::memory_order_seq_cst); }); if (!busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != conn_status.load(std::memory_order_seq_cst); })) return; if (1 != conn_status.load(std::memory_order_seq_cst)) return; - epee::net_utils::notify_remote_command2(cmd_conn_id, CMD_SHUTDOWN::ID, CMD_SHUTDOWN::request(), tcp_server.get_config_object()); + epee::net_utils::notify_remote_command2(cmd_context, CMD_SHUTDOWN::ID, CMD_SHUTDOWN::request(), tcp_server.get_config_object()); busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != commands_handler.close_connection_counter(); }); } @@ -299,7 +299,7 @@ namespace { std::atomic req_status(0); CMD_GET_STATISTICS::request req; - ASSERT_TRUE(epee::net_utils::async_invoke_remote_command2(m_cmd_conn_id, CMD_GET_STATISTICS::ID, req, + ASSERT_TRUE(epee::net_utils::async_invoke_remote_command2(m_context, CMD_GET_STATISTICS::ID, req, m_tcp_server.get_config_object(), [&](int code, const CMD_GET_STATISTICS::response& rsp, const test_connection_context&) { if (0 < code) { @@ -338,14 +338,14 @@ namespace { CMD_SEND_DATA_REQUESTS::request req; req.request_size = request_size; - epee::net_utils::notify_remote_command2(m_cmd_conn_id, CMD_SEND_DATA_REQUESTS::ID, req, m_tcp_server.get_config_object()); + epee::net_utils::notify_remote_command2(m_context, CMD_SEND_DATA_REQUESTS::ID, req, m_tcp_server.get_config_object()); } protected: test_tcp_server m_tcp_server; test_levin_commands_handler m_commands_handler; size_t m_thread_count; - boost::uuids::uuid m_cmd_conn_id; + test_connection_context m_context; }; } @@ -434,7 +434,7 @@ TEST_F(net_load_test_clt, a_lot_of_client_connections_and_connections_closed_by_ // Close connections CMD_CLOSE_ALL_CONNECTIONS::request req; - ASSERT_TRUE(epee::net_utils::notify_remote_command2(m_cmd_conn_id, CMD_CLOSE_ALL_CONNECTIONS::ID, req, m_tcp_server.get_config_object())); + ASSERT_TRUE(epee::net_utils::notify_remote_command2(m_context, CMD_CLOSE_ALL_CONNECTIONS::ID, req, m_tcp_server.get_config_object())); // Wait for all opened connections to close busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); }); @@ -455,10 +455,10 @@ TEST_F(net_load_test_clt, a_lot_of_client_connections_and_connections_closed_by_ // Close rest connections m_tcp_server.get_config_object().foreach_connection([&](test_connection_context& ctx) { - if (ctx.m_connection_id != m_cmd_conn_id) + if (ctx.m_connection_id != m_context.m_connection_id) { CMD_DATA_REQUEST::request req; - bool r = epee::net_utils::async_invoke_remote_command2(ctx.m_connection_id, CMD_DATA_REQUEST::ID, req, + bool r = epee::net_utils::async_invoke_remote_command2(ctx, CMD_DATA_REQUEST::ID, req, m_tcp_server.get_config_object(), [=](int code, const CMD_DATA_REQUEST::response& rsp, const test_connection_context&) { if (code <= 0) { @@ -548,7 +548,7 @@ TEST_F(net_load_test_clt, permament_open_and_close_and_connections_closed_by_ser CMD_START_OPEN_CLOSE_TEST::request req_start; req_start.open_request_target = CONNECTION_COUNT; req_start.max_opened_conn_count = MAX_OPENED_CONN_COUNT; - ASSERT_TRUE(epee::net_utils::async_invoke_remote_command2(m_cmd_conn_id, CMD_START_OPEN_CLOSE_TEST::ID, req_start, + ASSERT_TRUE(epee::net_utils::async_invoke_remote_command2(m_context, CMD_START_OPEN_CLOSE_TEST::ID, req_start, m_tcp_server.get_config_object(), [&](int code, const CMD_START_OPEN_CLOSE_TEST::response&, const test_connection_context&) { test_state.store(0 < code ? 1 : -1, std::memory_order_seq_cst); })); @@ -582,7 +582,7 @@ TEST_F(net_load_test_clt, permament_open_and_close_and_connections_closed_by_ser // Ask server to close rest connections CMD_CLOSE_ALL_CONNECTIONS::request req; - ASSERT_TRUE(epee::net_utils::notify_remote_command2(m_cmd_conn_id, CMD_CLOSE_ALL_CONNECTIONS::ID, req, m_tcp_server.get_config_object())); + ASSERT_TRUE(epee::net_utils::notify_remote_command2(m_context, CMD_CLOSE_ALL_CONNECTIONS::ID, req, m_tcp_server.get_config_object())); // Wait for almost all connections to be closed by server busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() <= m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT; }); @@ -601,10 +601,10 @@ TEST_F(net_load_test_clt, permament_open_and_close_and_connections_closed_by_ser // Close rest connections m_tcp_server.get_config_object().foreach_connection([&](test_connection_context& ctx) { - if (ctx.m_connection_id != m_cmd_conn_id) + if (ctx.m_connection_id != m_context.m_connection_id) { CMD_DATA_REQUEST::request req; - bool r = epee::net_utils::async_invoke_remote_command2(ctx.m_connection_id, CMD_DATA_REQUEST::ID, req, + bool r = epee::net_utils::async_invoke_remote_command2(ctx, CMD_DATA_REQUEST::ID, req, m_tcp_server.get_config_object(), [=](int code, const CMD_DATA_REQUEST::response& rsp, const test_connection_context&) { if (code <= 0) { diff --git a/tests/net_load_tests/net_load_tests.h b/tests/net_load_tests/net_load_tests.h index cdc2d267a..4a76f2ec6 100644 --- a/tests/net_load_tests/net_load_tests.h +++ b/tests/net_load_tests/net_load_tests.h @@ -47,6 +47,7 @@ namespace net_load_tests { struct test_connection_context : epee::net_utils::connection_context_base { + test_connection_context(): epee::net_utils::connection_context_base(boost::uuids::nil_uuid(), {}, false, false), m_closed(false) {} volatile bool m_closed; }; diff --git a/tests/net_load_tests/srv.cpp b/tests/net_load_tests/srv.cpp index fe32ec5cb..b42b1e1b0 100644 --- a/tests/net_load_tests/srv.cpp +++ b/tests/net_load_tests/srv.cpp @@ -147,7 +147,7 @@ namespace CMD_DATA_REQUEST::request req2; req2.data.resize(req.request_size); - bool r = epee::net_utils::async_invoke_remote_command2(ctx.m_connection_id, CMD_DATA_REQUEST::ID, req2, + bool r = epee::net_utils::async_invoke_remote_command2(ctx, CMD_DATA_REQUEST::ID, req2, m_tcp_server.get_config_object(), [=](int code, const CMD_DATA_REQUEST::response& rsp, const test_connection_context&) { if (code <= 0) { diff --git a/utils/logs/levin-traffic.awk b/utils/logs/levin-traffic.awk new file mode 100755 index 000000000..25c1c3ed1 --- /dev/null +++ b/utils/logs/levin-traffic.awk @@ -0,0 +1,73 @@ +#!/bin/awk -f + +function max(a, b) { if (a < b) return b; return a; } +function bytes_str(b) { if (b < 1024) return b " bytes"; if (b < 1024 * 1024) return b/1024 " kB"; return b/1024/1024 " MB"; } +function time_str(b) { if (b < 120) return b " sec"; if (b < 3600) return b/60 " min"; if (b < 86400) return b/3600 " hours"; return b/86400 " days"} + +BEGIN { +commands["command-1001"] = "HANDSHAKE" +commands["command-1002"] = "TIMED_SYNC" +commands["command-1003"] = "PING" +commands["command-1004"] = "REQUEST_STAT_INFO" +commands["command-1005"] = "REQUEST_NETWORK_STATE" +commands["command-1006"] = "REQUEST_PEER_ID" +commands["command-1007"] = "REQUEST_SUPPORT_FLAGS" +commands["command-2001"] = "NOTIFY_NEW_BLOCK" +commands["command-2002"] = "NOTIFY_NEW_TRANSACTIONS" +commands["command-2003"] = "REQUEST_GET_OBJECTS" +commands["command-2004"] = "RESPONSE_GET_OBJECTS" +commands["command-2006"] = "NOTIFY_REQUEST_CHAIN" +commands["command-2007"] = "RESPONSE_CHAIN_ENTRY" +commands["command-2008"] = "NOTIFY_NEW_FLUFFY_BLOCK" +commands["command-2009"] = "NOTIFY_REQUEST_FLUFFY_MISSING_TX" +} + +/ net.p2p.traffic / { + date=gensub(/-/, " ", "g", $1) + time=gensub(/\..*/, "", "g", gensub(/:/, " ", "g", $2)) + ip=gensub(/\[/, "", "g", $7) + outin=gensub(/]/, "", "g", $8) + timestamp=date " " time + timestamp=mktime(timestamp) + if (!t0) + t0 = timestamp + if (!t0ip[ip]) + t0ip[ip] = timestamp + t1 = timestamp + t1ip[ip] = timestamp + bytes=$9 + dir=$11 + command=$14 + initiator=$17 + + bytes_by_command[command] += bytes + bytes_by_ip[ip] += bytes + if (dir == "sent") + bytes_sent_by_ip[ip] += bytes + else + bytes_received_by_ip[ip] += bytes + bytes_by_outin[outin] += bytes + bytes_by_direction[dir] += bytes + bytes_by_initiator[initiator] += bytes +} + +END { + dt = t1 - t0 + print "Running time:", time_str(dt) + for (command in bytes_by_command) { + category = command + if (commands[command]) + category = commands[command]; + print "Category", category ":", bytes_str(bytes_by_command[command]) + } + for (direction in bytes_by_direction) print direction ":", bytes_str(bytes_by_direction[direction]) + for (initiator in bytes_by_initiator) print "Initiating from", initiator ":", bytes_str(bytes_by_initiator[initiator]) + for (outin in bytes_by_outin) print "With", outin, "peers:", bytes_str(bytes_by_outin[outin]) + for (ip in bytes_by_ip) print "IP", ip ":", bytes_str(bytes_by_ip[ip]) + print "Download rate:", bytes_str(bytes_by_direction["received"] / max(dt, 1)) "/s" + for (ip in bytes_received_by_ip) + print " ", ip ":", bytes_str(bytes_received_by_ip[ip] / max(t1ip[ip] - t0ip[ip], 1)) "/s over", time_str(t1ip[ip] - t0ip[ip]) + print "Upload rate:", bytes_str(bytes_by_direction["sent"] / max(dt, 1)) "/s" + for (ip in bytes_sent_by_ip) + print " ", ip ":", bytes_str(bytes_sent_by_ip[ip] / max(t1ip[ip] - t0ip[ip], 1)) "/s over", time_str(t1ip[ip] - t0ip[ip]) +}