network: log traffic and add a simple traffic analysis script

This commit is contained in:
moneromooo-monero 2019-11-22 13:43:44 +00:00
parent 59e7d5686b
commit 56a4469ef3
No known key found for this signature in database
GPG key ID: 686F07454D6CEFC3
7 changed files with 155 additions and 27 deletions

View file

@ -34,10 +34,28 @@
#undef MONERO_DEFAULT_LOG_CATEGORY
#define MONERO_DEFAULT_LOG_CATEGORY "net"
namespace
{
template<typename context_t>
void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, const char *category)
{
MCINFO("net.p2p.traffic", context << bytes << " bytes " << (sent ? "sent" : "received") << (error ? "/corrupt" : "")
<< " for category " << category << " initiated by " << (initiator ? "us" : "peer"));
}
template<typename context_t>
void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, int command)
{
char buf[32];
snprintf(buf, sizeof(buf), "command-%u", command);
return on_levin_traffic(context, initiator, sent, error, bytes, buf);
}
}
namespace epee
{
namespace net_utils
{
#if 0
template<class t_arg, class t_result, class t_transport>
bool invoke_remote_command2(int command, const t_arg& out_struct, t_result& result_struct, t_transport& transport)
{
@ -83,16 +101,18 @@ namespace epee
}
return true;
}
#endif
template<class t_arg, class t_result, class t_transport>
bool invoke_remote_command2(boost::uuids::uuid conn_id, int command, const t_arg& out_struct, t_result& result_struct, t_transport& transport)
bool invoke_remote_command2(const epee::net_utils::connection_context_base context, int command, const t_arg& out_struct, t_result& result_struct, t_transport& transport)
{
const boost::uuids::uuid &conn_id = context.m_connection_id;
typename serialization::portable_storage stg;
out_struct.store(stg);
std::string buff_to_send, buff_to_recv;
stg.store_to_binary(buff_to_send);
on_levin_traffic(context, true, true, false, buff_to_send.size(), command);
int res = transport.invoke(command, buff_to_send, buff_to_recv, conn_id);
if( res <=0 )
{
@ -102,24 +122,30 @@ namespace epee
typename serialization::portable_storage stg_ret;
if(!stg_ret.load_from_binary(buff_to_recv))
{
on_levin_traffic(context, true, false, true, buff_to_recv.size(), command);
LOG_ERROR("Failed to load_from_binary on command " << command);
return false;
}
on_levin_traffic(context, true, false, false, buff_to_recv.size(), command);
return result_struct.load(stg_ret);
}
template<class t_result, class t_arg, class callback_t, class t_transport>
bool async_invoke_remote_command2(boost::uuids::uuid conn_id, int command, const t_arg& out_struct, t_transport& transport, const callback_t &cb, size_t inv_timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
bool async_invoke_remote_command2(const epee::net_utils::connection_context_base &context, int command, const t_arg& out_struct, t_transport& transport, const callback_t &cb, size_t inv_timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
{
const boost::uuids::uuid &conn_id = context.m_connection_id;
typename serialization::portable_storage stg;
const_cast<t_arg&>(out_struct).store(stg);//TODO: add true const support to searilzation
std::string buff_to_send;
stg.store_to_binary(buff_to_send);
on_levin_traffic(context, true, true, false, buff_to_send.size(), command);
int res = transport.invoke_async(command, epee::strspan<uint8_t>(buff_to_send), conn_id, [cb, command](int code, const epee::span<const uint8_t> buff, typename t_transport::connection_context& context)->bool
{
t_result result_struct = AUTO_VAL_INIT(result_struct);
if( code <=0 )
{
if (!buff.empty())
on_levin_traffic(context, true, false, true, buff.size(), command);
LOG_PRINT_L1("Failed to invoke command " << command << " return code " << code);
cb(code, result_struct, context);
return false;
@ -127,16 +153,19 @@ namespace epee
serialization::portable_storage stg_ret;
if(!stg_ret.load_from_binary(buff))
{
on_levin_traffic(context, true, false, true, buff.size(), command);
LOG_ERROR("Failed to load_from_binary on command " << command);
cb(LEVIN_ERROR_FORMAT, result_struct, context);
return false;
}
if (!result_struct.load(stg_ret))
{
on_levin_traffic(context, true, false, true, buff.size(), command);
LOG_ERROR("Failed to load result struct on command " << command);
cb(LEVIN_ERROR_FORMAT, result_struct, context);
return false;
}
on_levin_traffic(context, true, false, false, buff.size(), command);
cb(code, result_struct, context);
return true;
}, inv_timeout);
@ -149,14 +178,15 @@ namespace epee
}
template<class t_arg, class t_transport>
bool notify_remote_command2(boost::uuids::uuid conn_id, int command, const t_arg& out_struct, t_transport& transport)
bool notify_remote_command2(const typename t_transport::connection_context &context, int command, const t_arg& out_struct, t_transport& transport)
{
const boost::uuids::uuid &conn_id = context.m_connection_id;
serialization::portable_storage stg;
out_struct.store(stg);
std::string buff_to_send;
stg.store_to_binary(buff_to_send);
on_levin_traffic(context, true, true, false, buff_to_send.size(), command);
int res = transport.notify(command, epee::strspan<uint8_t>(buff_to_send), conn_id);
if(res <=0 )
{
@ -173,6 +203,7 @@ namespace epee
serialization::portable_storage strg;
if(!strg.load_from_binary(in_buff))
{
on_levin_traffic(context, false, false, true, in_buff.size(), command);
LOG_ERROR("Failed to load_from_binary in command " << command);
return -1;
}
@ -181,9 +212,11 @@ namespace epee
if (!static_cast<t_in_type&>(in_struct).load(strg))
{
on_levin_traffic(context, false, false, true, in_buff.size(), command);
LOG_ERROR("Failed to load in_struct in command " << command);
return -1;
}
on_levin_traffic(context, false, false, false, in_buff.size(), command);
int res = cb(command, static_cast<t_in_type&>(in_struct), static_cast<t_out_type&>(out_struct), context);
serialization::portable_storage strg_out;
static_cast<t_out_type&>(out_struct).store(strg_out);
@ -193,6 +226,7 @@ namespace epee
LOG_ERROR("Failed to store_to_binary in command" << command);
return -1;
}
on_levin_traffic(context, false, true, false, buff_out.size(), command);
return res;
}
@ -203,15 +237,18 @@ namespace epee
serialization::portable_storage strg;
if(!strg.load_from_binary(in_buff))
{
on_levin_traffic(context, false, false, true, in_buff.size(), command);
LOG_ERROR("Failed to load_from_binary in notify " << command);
return -1;
}
boost::value_initialized<t_in_type> in_struct;
if (!static_cast<t_in_type&>(in_struct).load(strg))
{
on_levin_traffic(context, false, false, true, in_buff.size(), command);
LOG_ERROR("Failed to load in_struct in notify " << command);
return -1;
}
on_levin_traffic(context, false, false, false, in_buff.size(), command);
return cb(command, in_struct, context);
}
@ -296,6 +333,7 @@ namespace epee
#define END_INVOKE_MAP2() \
LOG_ERROR("Unknown command:" << command); \
on_levin_traffic(context, false, false, true, in_buff.size(), "invalid-command"); \
return LEVIN_ERROR_CONNECTION_HANDLER_NOT_DEFINED; \
}
}

View file

@ -44,6 +44,14 @@
#include "net/dandelionpp.h"
#include "p2p/net_node.h"
namespace
{
int get_command_from_message(const cryptonote::blobdata &msg)
{
return msg.size() >= sizeof(epee::levin::bucket_head2) ? SWAP32LE(((epee::levin::bucket_head2*)msg.data())->m_command) : 0;
}
}
namespace cryptonote
{
namespace levin
@ -164,6 +172,10 @@ namespace levin
bool make_payload_send_txs(connections& p2p, std::vector<blobdata>&& txs, const boost::uuids::uuid& destination, const bool pad)
{
const cryptonote::blobdata blob = make_tx_payload(std::move(txs), pad);
p2p.for_connection(destination, [&blob](detail::p2p_context& context) {
on_levin_traffic(context, true, true, false, blob.size(), get_command_from_message(blob));
return true;
});
return p2p.notify(NOTIFY_NEW_TRANSACTIONS::ID, epee::strspan<std::uint8_t>(blob), destination);
}
@ -539,6 +551,10 @@ namespace levin
else
message = zone_->noise.clone();
zone_->p2p->for_connection(channel.connection, [&](detail::p2p_context& context) {
on_levin_traffic(context, true, true, false, message.size(), "noise");
return true;
});
if (zone_->p2p->send(std::move(message), channel.connection))
{
if (!channel.queue.empty() && channel.active.empty())

View file

@ -1024,7 +1024,7 @@ namespace nodetool
epee::simple_event ev;
std::atomic<bool> hsh_result(false);
bool r = epee::net_utils::async_invoke_remote_command2<typename COMMAND_HANDSHAKE::response>(context_.m_connection_id, COMMAND_HANDSHAKE::ID, arg, zone.m_net_server.get_config_object(),
bool r = epee::net_utils::async_invoke_remote_command2<typename COMMAND_HANDSHAKE::response>(context_, COMMAND_HANDSHAKE::ID, arg, zone.m_net_server.get_config_object(),
[this, &pi, &ev, &hsh_result, &just_take_peerlist, &context_](int code, const typename COMMAND_HANDSHAKE::response& rsp, p2p_connection_context& context)
{
epee::misc_utils::auto_scope_leave_caller scope_exit_handler = epee::misc_utils::create_scope_leave_handler([&](){ev.raise();});
@ -1109,7 +1109,7 @@ namespace nodetool
m_payload_handler.get_payload_sync_data(arg.payload_data);
network_zone& zone = m_network_zones.at(context_.m_remote_address.get_zone());
bool r = epee::net_utils::async_invoke_remote_command2<typename COMMAND_TIMED_SYNC::response>(context_.m_connection_id, COMMAND_TIMED_SYNC::ID, arg, zone.m_net_server.get_config_object(),
bool r = epee::net_utils::async_invoke_remote_command2<typename COMMAND_TIMED_SYNC::response>(context_, COMMAND_TIMED_SYNC::ID, arg, zone.m_net_server.get_config_object(),
[this](int code, const typename COMMAND_TIMED_SYNC::response& rsp, p2p_connection_context& context)
{
context.m_in_timedsync = false;
@ -2216,7 +2216,7 @@ namespace nodetool
network_zone& zone = m_network_zones.at(address.get_zone());
bool inv_call_res = epee::net_utils::async_invoke_remote_command2<COMMAND_PING::response>(ping_context.m_connection_id, COMMAND_PING::ID, req, zone.m_net_server.get_config_object(),
bool inv_call_res = epee::net_utils::async_invoke_remote_command2<COMMAND_PING::response>(ping_context, COMMAND_PING::ID, req, zone.m_net_server.get_config_object(),
[=](int code, const COMMAND_PING::response& rsp, p2p_connection_context& context)
{
if(code <= 0)
@ -2260,7 +2260,7 @@ namespace nodetool
COMMAND_REQUEST_SUPPORT_FLAGS::request support_flags_request;
bool r = epee::net_utils::async_invoke_remote_command2<typename COMMAND_REQUEST_SUPPORT_FLAGS::response>
(
context.m_connection_id,
context,
COMMAND_REQUEST_SUPPORT_FLAGS::ID,
support_flags_request,
m_network_zones.at(epee::net_utils::zone::public_).m_net_server.get_config_object(),

View file

@ -202,11 +202,11 @@ namespace
// Connect to server
std::atomic<int> conn_status(0);
m_cmd_conn_id = boost::uuids::nil_uuid();
m_context = {};
ASSERT_TRUE(m_tcp_server.connect_async("127.0.0.1", srv_port, CONNECTION_TIMEOUT, [&](const test_connection_context& context, const boost::system::error_code& ec) {
if (!ec)
{
m_cmd_conn_id = context.m_connection_id;
m_context = context;
}
else
{
@ -217,11 +217,11 @@ namespace
EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != conn_status.load(std::memory_order_seq_cst); })) << "connect_async timed out";
ASSERT_EQ(1, conn_status.load(std::memory_order_seq_cst));
ASSERT_FALSE(m_cmd_conn_id.is_nil());
ASSERT_FALSE(m_context.m_connection_id.is_nil());
conn_status.store(0, std::memory_order_seq_cst);
CMD_RESET_STATISTICS::request req;
ASSERT_TRUE(epee::net_utils::async_invoke_remote_command2<CMD_RESET_STATISTICS::response>(m_cmd_conn_id, CMD_RESET_STATISTICS::ID, req,
ASSERT_TRUE(epee::net_utils::async_invoke_remote_command2<CMD_RESET_STATISTICS::response>(m_context, CMD_RESET_STATISTICS::ID, req,
m_tcp_server.get_config_object(), [&](int code, const CMD_RESET_STATISTICS::response& rsp, const test_connection_context&) {
conn_status.store(code, std::memory_order_seq_cst);
}));
@ -250,16 +250,16 @@ namespace
// Connect to server and invoke shutdown command
std::atomic<int> conn_status(0);
boost::uuids::uuid cmd_conn_id = boost::uuids::nil_uuid();
test_connection_context cmd_context;
tcp_server.connect_async("127.0.0.1", srv_port, CONNECTION_TIMEOUT, [&](const test_connection_context& context, const boost::system::error_code& ec) {
cmd_conn_id = context.m_connection_id;
cmd_context = context;
conn_status.store(!ec ? 1 : -1, std::memory_order_seq_cst);
});
if (!busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != conn_status.load(std::memory_order_seq_cst); })) return;
if (1 != conn_status.load(std::memory_order_seq_cst)) return;
epee::net_utils::notify_remote_command2(cmd_conn_id, CMD_SHUTDOWN::ID, CMD_SHUTDOWN::request(), tcp_server.get_config_object());
epee::net_utils::notify_remote_command2(cmd_context, CMD_SHUTDOWN::ID, CMD_SHUTDOWN::request(), tcp_server.get_config_object());
busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != commands_handler.close_connection_counter(); });
}
@ -299,7 +299,7 @@ namespace
{
std::atomic<int> req_status(0);
CMD_GET_STATISTICS::request req;
ASSERT_TRUE(epee::net_utils::async_invoke_remote_command2<CMD_GET_STATISTICS::response>(m_cmd_conn_id, CMD_GET_STATISTICS::ID, req,
ASSERT_TRUE(epee::net_utils::async_invoke_remote_command2<CMD_GET_STATISTICS::response>(m_context, CMD_GET_STATISTICS::ID, req,
m_tcp_server.get_config_object(), [&](int code, const CMD_GET_STATISTICS::response& rsp, const test_connection_context&) {
if (0 < code)
{
@ -338,14 +338,14 @@ namespace
{
CMD_SEND_DATA_REQUESTS::request req;
req.request_size = request_size;
epee::net_utils::notify_remote_command2(m_cmd_conn_id, CMD_SEND_DATA_REQUESTS::ID, req, m_tcp_server.get_config_object());
epee::net_utils::notify_remote_command2(m_context, CMD_SEND_DATA_REQUESTS::ID, req, m_tcp_server.get_config_object());
}
protected:
test_tcp_server m_tcp_server;
test_levin_commands_handler m_commands_handler;
size_t m_thread_count;
boost::uuids::uuid m_cmd_conn_id;
test_connection_context m_context;
};
}
@ -434,7 +434,7 @@ TEST_F(net_load_test_clt, a_lot_of_client_connections_and_connections_closed_by_
// Close connections
CMD_CLOSE_ALL_CONNECTIONS::request req;
ASSERT_TRUE(epee::net_utils::notify_remote_command2(m_cmd_conn_id, CMD_CLOSE_ALL_CONNECTIONS::ID, req, m_tcp_server.get_config_object()));
ASSERT_TRUE(epee::net_utils::notify_remote_command2(m_context, CMD_CLOSE_ALL_CONNECTIONS::ID, req, m_tcp_server.get_config_object()));
// Wait for all opened connections to close
busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); });
@ -455,10 +455,10 @@ TEST_F(net_load_test_clt, a_lot_of_client_connections_and_connections_closed_by_
// Close rest connections
m_tcp_server.get_config_object().foreach_connection([&](test_connection_context& ctx) {
if (ctx.m_connection_id != m_cmd_conn_id)
if (ctx.m_connection_id != m_context.m_connection_id)
{
CMD_DATA_REQUEST::request req;
bool r = epee::net_utils::async_invoke_remote_command2<CMD_DATA_REQUEST::response>(ctx.m_connection_id, CMD_DATA_REQUEST::ID, req,
bool r = epee::net_utils::async_invoke_remote_command2<CMD_DATA_REQUEST::response>(ctx, CMD_DATA_REQUEST::ID, req,
m_tcp_server.get_config_object(), [=](int code, const CMD_DATA_REQUEST::response& rsp, const test_connection_context&) {
if (code <= 0)
{
@ -548,7 +548,7 @@ TEST_F(net_load_test_clt, permament_open_and_close_and_connections_closed_by_ser
CMD_START_OPEN_CLOSE_TEST::request req_start;
req_start.open_request_target = CONNECTION_COUNT;
req_start.max_opened_conn_count = MAX_OPENED_CONN_COUNT;
ASSERT_TRUE(epee::net_utils::async_invoke_remote_command2<CMD_START_OPEN_CLOSE_TEST::response>(m_cmd_conn_id, CMD_START_OPEN_CLOSE_TEST::ID, req_start,
ASSERT_TRUE(epee::net_utils::async_invoke_remote_command2<CMD_START_OPEN_CLOSE_TEST::response>(m_context, CMD_START_OPEN_CLOSE_TEST::ID, req_start,
m_tcp_server.get_config_object(), [&](int code, const CMD_START_OPEN_CLOSE_TEST::response&, const test_connection_context&) {
test_state.store(0 < code ? 1 : -1, std::memory_order_seq_cst);
}));
@ -582,7 +582,7 @@ TEST_F(net_load_test_clt, permament_open_and_close_and_connections_closed_by_ser
// Ask server to close rest connections
CMD_CLOSE_ALL_CONNECTIONS::request req;
ASSERT_TRUE(epee::net_utils::notify_remote_command2(m_cmd_conn_id, CMD_CLOSE_ALL_CONNECTIONS::ID, req, m_tcp_server.get_config_object()));
ASSERT_TRUE(epee::net_utils::notify_remote_command2(m_context, CMD_CLOSE_ALL_CONNECTIONS::ID, req, m_tcp_server.get_config_object()));
// Wait for almost all connections to be closed by server
busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() <= m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT; });
@ -601,10 +601,10 @@ TEST_F(net_load_test_clt, permament_open_and_close_and_connections_closed_by_ser
// Close rest connections
m_tcp_server.get_config_object().foreach_connection([&](test_connection_context& ctx) {
if (ctx.m_connection_id != m_cmd_conn_id)
if (ctx.m_connection_id != m_context.m_connection_id)
{
CMD_DATA_REQUEST::request req;
bool r = epee::net_utils::async_invoke_remote_command2<CMD_DATA_REQUEST::response>(ctx.m_connection_id, CMD_DATA_REQUEST::ID, req,
bool r = epee::net_utils::async_invoke_remote_command2<CMD_DATA_REQUEST::response>(ctx, CMD_DATA_REQUEST::ID, req,
m_tcp_server.get_config_object(), [=](int code, const CMD_DATA_REQUEST::response& rsp, const test_connection_context&) {
if (code <= 0)
{

View file

@ -47,6 +47,7 @@ namespace net_load_tests
{
struct test_connection_context : epee::net_utils::connection_context_base
{
test_connection_context(): epee::net_utils::connection_context_base(boost::uuids::nil_uuid(), {}, false, false), m_closed(false) {}
volatile bool m_closed;
};

View file

@ -147,7 +147,7 @@ namespace
CMD_DATA_REQUEST::request req2;
req2.data.resize(req.request_size);
bool r = epee::net_utils::async_invoke_remote_command2<CMD_DATA_REQUEST::response>(ctx.m_connection_id, CMD_DATA_REQUEST::ID, req2,
bool r = epee::net_utils::async_invoke_remote_command2<CMD_DATA_REQUEST::response>(ctx, CMD_DATA_REQUEST::ID, req2,
m_tcp_server.get_config_object(), [=](int code, const CMD_DATA_REQUEST::response& rsp, const test_connection_context&) {
if (code <= 0)
{

73
utils/logs/levin-traffic.awk Executable file
View file

@ -0,0 +1,73 @@
#!/bin/awk -f
function max(a, b) { if (a < b) return b; return a; }
function bytes_str(b) { if (b < 1024) return b " bytes"; if (b < 1024 * 1024) return b/1024 " kB"; return b/1024/1024 " MB"; }
function time_str(b) { if (b < 120) return b " sec"; if (b < 3600) return b/60 " min"; if (b < 86400) return b/3600 " hours"; return b/86400 " days"}
BEGIN {
commands["command-1001"] = "HANDSHAKE"
commands["command-1002"] = "TIMED_SYNC"
commands["command-1003"] = "PING"
commands["command-1004"] = "REQUEST_STAT_INFO"
commands["command-1005"] = "REQUEST_NETWORK_STATE"
commands["command-1006"] = "REQUEST_PEER_ID"
commands["command-1007"] = "REQUEST_SUPPORT_FLAGS"
commands["command-2001"] = "NOTIFY_NEW_BLOCK"
commands["command-2002"] = "NOTIFY_NEW_TRANSACTIONS"
commands["command-2003"] = "REQUEST_GET_OBJECTS"
commands["command-2004"] = "RESPONSE_GET_OBJECTS"
commands["command-2006"] = "NOTIFY_REQUEST_CHAIN"
commands["command-2007"] = "RESPONSE_CHAIN_ENTRY"
commands["command-2008"] = "NOTIFY_NEW_FLUFFY_BLOCK"
commands["command-2009"] = "NOTIFY_REQUEST_FLUFFY_MISSING_TX"
}
/ net.p2p.traffic / {
date=gensub(/-/, " ", "g", $1)
time=gensub(/\..*/, "", "g", gensub(/:/, " ", "g", $2))
ip=gensub(/\[/, "", "g", $7)
outin=gensub(/]/, "", "g", $8)
timestamp=date " " time
timestamp=mktime(timestamp)
if (!t0)
t0 = timestamp
if (!t0ip[ip])
t0ip[ip] = timestamp
t1 = timestamp
t1ip[ip] = timestamp
bytes=$9
dir=$11
command=$14
initiator=$17
bytes_by_command[command] += bytes
bytes_by_ip[ip] += bytes
if (dir == "sent")
bytes_sent_by_ip[ip] += bytes
else
bytes_received_by_ip[ip] += bytes
bytes_by_outin[outin] += bytes
bytes_by_direction[dir] += bytes
bytes_by_initiator[initiator] += bytes
}
END {
dt = t1 - t0
print "Running time:", time_str(dt)
for (command in bytes_by_command) {
category = command
if (commands[command])
category = commands[command];
print "Category", category ":", bytes_str(bytes_by_command[command])
}
for (direction in bytes_by_direction) print direction ":", bytes_str(bytes_by_direction[direction])
for (initiator in bytes_by_initiator) print "Initiating from", initiator ":", bytes_str(bytes_by_initiator[initiator])
for (outin in bytes_by_outin) print "With", outin, "peers:", bytes_str(bytes_by_outin[outin])
for (ip in bytes_by_ip) print "IP", ip ":", bytes_str(bytes_by_ip[ip])
print "Download rate:", bytes_str(bytes_by_direction["received"] / max(dt, 1)) "/s"
for (ip in bytes_received_by_ip)
print " ", ip ":", bytes_str(bytes_received_by_ip[ip] / max(t1ip[ip] - t0ip[ip], 1)) "/s over", time_str(t1ip[ip] - t0ip[ip])
print "Upload rate:", bytes_str(bytes_by_direction["sent"] / max(dt, 1)) "/s"
for (ip in bytes_sent_by_ip)
print " ", ip ":", bytes_str(bytes_sent_by_ip[ip] / max(t1ip[ip] - t0ip[ip], 1)) "/s over", time_str(t1ip[ip] - t0ip[ip])
}