2014 network limit 1.1 +utils +toc -doc -drmonero

Update of the PR with network limits

works very well for all speeds
(but remember that low download speed can stop upload
because we then slow down downloading of blockchain
requests too)

more debug options

fixed pedantic warnings in our code
should work again on Mac OS X and FreeBSD
fixed warning about size_t
tested on Debian, Ubuntu, Windows(testing now)

TCP options and ToS (QoS) flag
FIXED peer number limit
FIXED some spikes in ingress/download
FIXED problems when other up and down limit
This commit is contained in:
rfree2monero 2015-02-12 20:59:39 +01:00
parent eabb519605
commit 5ce4256e3d
30 changed files with 714 additions and 383 deletions

View file

@ -56,6 +56,7 @@
#include "syncobj.h"
#include "../../../../src/p2p/connection_basic.hpp"
#include "../../../../contrib/otshell_utils/utils.hpp"
#include "../../../../src/p2p/network_throttle-detail.hpp"
#define ABSTRACT_SERVER_SEND_QUE_MAX_COUNT 1000
@ -130,6 +131,7 @@ namespace net_utils
/// Buffer for incoming data.
boost::array<char, 8192> buffer_;
//boost::array<char, 1024> buffer_;
t_connection_context context;
i_connection_filter* &m_pfilter;
@ -143,6 +145,12 @@ namespace net_utils
critical_section m_chunking_lock; // held while we add small chunks of the big do_send() to small do_send_chunk()
t_server_role m_connection_type;
// for calculate speed (last 60 sec)
network_throttle m_throttle_speed_in;
network_throttle m_throttle_speed_out;
std::mutex m_throttle_speed_in_mutex;
std::mutex m_throttle_speed_out_mutex;
public:
void setRPcStation();
@ -285,6 +293,7 @@ namespace net_utils
critical_section m_threads_lock;
volatile uint32_t m_thread_index; // TODO change to std::atomic
t_server_role type;
void detach_threads();
/// The next connection to be accepted
connection_ptr new_connection_;

View file

@ -52,6 +52,7 @@
#include "../../../../src/cryptonote_core/cryptonote_core.h" // e.g. for the send_stop_signal()
#include "../../../../contrib/otshell_utils/utils.hpp"
#include "../../../../src/p2p/data_logger.hpp"
using namespace nOT::nUtils; // TODO
PRAGMA_WARNING_PUSH
@ -75,7 +76,9 @@ PRAGMA_WARNING_DISABLE_VS(4355)
connection_basic(io_service, ref_sock_count, sock_number),
m_protocol_handler(this, config, context),
m_pfilter( pfilter ),
m_connection_type(NET)
m_connection_type(NET),
m_throttle_speed_in("speed_in", "throttle_speed_in"),
m_throttle_speed_out("speed_out", "throttle_speed_out")
{
_info_c("net/sleepRPC", "connection constructor set m_connection_type="<<m_connection_type);
}
@ -162,6 +165,9 @@ PRAGMA_WARNING_DISABLE_VS(4355)
socket_.set_option( optionTos );
//_dbg1("Set ToS flag to " << tos);
boost::asio::ip::tcp::no_delay noDelayOption(false);
socket_.set_option(noDelayOption);
return true;
CATCH_ENTRY_L0("connection<t_protocol_handler>::start()", false);
@ -240,6 +246,32 @@ PRAGMA_WARNING_DISABLE_VS(4355)
if (!e)
{
{
CRITICAL_REGION_LOCAL(m_throttle_speed_in_mutex);
m_throttle_speed_in.handle_trafic_exact(bytes_transferred);
context.m_current_speed_down = m_throttle_speed_in.get_current_speed();
}
{
CRITICAL_REGION_LOCAL( epee::net_utils::network_throttle_manager::network_throttle_manager::m_lock_get_global_throttle_in );
epee::net_utils::network_throttle_manager::network_throttle_manager::get_global_throttle_in().handle_trafic_exact(bytes_transferred * 1024);
}
double delay=0; // will be calculated
do
{
{ //_scope_dbg1("CRITICAL_REGION_LOCAL");
CRITICAL_REGION_LOCAL( epee::net_utils::network_throttle_manager::m_lock_get_global_throttle_in );
delay = epee::net_utils::network_throttle_manager::get_global_throttle_in().get_sleep_time_after_tick( bytes_transferred ); // decission from global
}
delay *= 0.5;
if (delay > 0) {
long int ms = (long int)(delay * 100);
epee::net_utils::data_logger::get_instance().add_data("sleep_down", ms);
std::this_thread::sleep_for(std::chrono::milliseconds(ms));
}
} while(delay > 0);
//_info("[sock " << socket_.native_handle() << "] RECV " << bytes_transferred);
logger_handle_net_read(bytes_transferred);
context.m_last_recv = time(NULL);
@ -398,6 +430,11 @@ PRAGMA_WARNING_DISABLE_VS(4355)
return false;
if(m_was_shutdown)
return false;
{
CRITICAL_REGION_LOCAL(m_throttle_speed_out_mutex);
m_throttle_speed_out.handle_trafic_exact(cb);
context.m_current_speed_up = m_throttle_speed_out.get_current_speed();
}
//_info("[sock " << socket_.native_handle() << "] SEND " << cb);
context.m_last_send = time(NULL);
@ -405,8 +442,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
//some data should be wrote to stream
//request complete
do_send_handler_start( ptr , cb ); // (((H)))
sleep_before_packet(cb, 1, 1);
epee::critical_region_t<decltype(m_send_que_lock)> send_guard(m_send_que_lock); // *** critical ***
long int retry=0;
const long int retry_limit = 5*4;
@ -440,8 +476,9 @@ PRAGMA_WARNING_DISABLE_VS(4355)
{ // active operation should be in progress, nothing to do, just wait last operation callback
auto size_now = cb;
_info_c("net/out/size", "do_send() NOW just queues: packet="<<size_now<<" B, is added to queue-size="<<m_send_que.size());
do_send_handler_delayed( ptr , size_now ); // (((H)))
//do_send_handler_delayed( ptr , size_now ); // (((H))) // empty function
LOG_PRINT_L4("[sock " << socket_.native_handle() << "] Async send requested " << m_send_que.front().size());
}
else
{ // no active operation
@ -463,11 +500,11 @@ PRAGMA_WARNING_DISABLE_VS(4355)
//)
);
//_dbg3("(chunk): " << size_now);
logger_handle_net_write(size_now);
//logger_handle_net_write(size_now);
//_info("[sock " << socket_.native_handle() << "] Async send requested " << m_send_que.front().size());
}
do_send_handler_stop( ptr , cb );
//do_send_handler_stop( ptr , cb ); // empty function
return true;
@ -516,7 +553,8 @@ PRAGMA_WARNING_DISABLE_VS(4355)
shutdown();
return;
}
logger_handle_net_write(cb);
sleep_before_packet(cb, 1, 1);
bool do_shutdown = false;
CRITICAL_REGION_BEGIN(m_send_que_lock);
if(m_send_que.empty())
@ -545,7 +583,6 @@ PRAGMA_WARNING_DISABLE_VS(4355)
// )
);
//_dbg3("(normal)" << size_now);
logger_handle_net_write(size_now);
}
CRITICAL_REGION_END();
@ -784,6 +821,10 @@ POP_WARNINGS
template<class t_protocol_handler>
void boosted_tcp_server<t_protocol_handler>::send_stop_signal()
{
if (::cryptonote::core::get_fast_exit() == true)
{
detach_threads();
}
m_stop_signal_sent = true;
TRY_ENTRY();
io_service_.stop();
@ -988,6 +1029,15 @@ POP_WARNINGS
return true;
CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::connect_async", false);
}
//---------------------------------------------------------------------------------
template<class t_protocol_handler>
void boosted_tcp_server<t_protocol_handler>::detach_threads()
{
for (auto thread : m_threads)
thread->detach();
}
} // namespace
} // namespace
PRAGMA_WARNING_POP

View file

@ -81,7 +81,7 @@ public:
async_protocol_handler_config():m_pcommands_handler(NULL), m_max_packet_size(LEVIN_DEFAULT_MAX_PACKET_SIZE)
{}
void del_connections(size_t count);
void del_out_connections(size_t count);
};
@ -670,10 +670,30 @@ void async_protocol_handler_config<t_connection_context>::del_connection(async_p
}
//------------------------------------------------------------------------------------------
template<class t_connection_context>
void async_protocol_handler_config<t_connection_context>::del_connections(size_t count) // TODO
void async_protocol_handler_config<t_connection_context>::del_out_connections(size_t count)
{
std::vector <boost::uuids::uuid> out_connections;
CRITICAL_REGION_BEGIN(m_connects_lock);
m_connects.clear();
for (auto& c: m_connects)
{
if (!c.second->m_connection_context.m_is_income)
out_connections.push_back(c.first);
}
if (out_connections.size() == 0)
return;
// close random out connections
unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
shuffle(out_connections.begin(), out_connections.end(), std::default_random_engine(seed));
while (count > 0 && out_connections.size() > 0)
{
close(*out_connections.begin());
del_connection(m_connects.at(*out_connections.begin()));
out_connections.erase(out_connections.begin());
--count;
}
CRITICAL_REGION_END();
}
//------------------------------------------------------------------------------------------

View file

@ -55,6 +55,8 @@ namespace net_utils
time_t m_last_send;
uint64_t m_recv_cnt;
uint64_t m_send_cnt;
double m_current_speed_down;
double m_current_speed_up;
connection_context_base(boost::uuids::uuid connection_id,
long remote_ip, int remote_port, bool is_income,
@ -68,7 +70,9 @@ namespace net_utils
m_last_recv(last_recv),
m_last_send(last_send),
m_recv_cnt(recv_cnt),
m_send_cnt(send_cnt)
m_send_cnt(send_cnt),
m_current_speed_down(0),
m_current_speed_up(0)
{}
connection_context_base(): m_connection_id(),
@ -79,7 +83,9 @@ namespace net_utils
m_last_recv(0),
m_last_send(0),
m_recv_cnt(0),
m_send_cnt(0)
m_send_cnt(0),
m_current_speed_down(0),
m_current_speed_up(0)
{}
connection_context_base& operator=(const connection_context_base& a)

View file

@ -185,7 +185,7 @@ namespace epee
}
return res;
};
}
template<class t_owner, class t_in_type, class t_context, class callback_t>
int buff_to_t_adapter(t_owner* powner, int command, const std::string& in_buff, callback_t cb, t_context& context)
@ -199,7 +199,7 @@ namespace epee
boost::value_initialized<t_in_type> in_struct;
static_cast<t_in_type&>(in_struct).load(strg);
return cb(command, in_struct, context);
};
}
#define CHAIN_LEVIN_INVOKE_MAP2(context_type) \
int invoke(int command, const std::string& in_buff, std::string& buff_out, context_type& context) \

View file

@ -7,7 +7,7 @@
namespace nOT {
INJECT_OT_COMMON_USING_NAMESPACE_COMMON_1; // <=== namespaces
INJECT_OT_COMMON_USING_NAMESPACE_COMMON_1 // <=== namespaces
// (no debug - this is the default)
// +nodebug (no debug)
@ -64,6 +64,6 @@ void cRunOptions::Normalize() {
cRunOptions gRunOptions; // (extern)
}; // namespace OT
} // namespace OT

View file

@ -10,7 +10,7 @@ Template for new files, replace word "template" and later delete this line here.
namespace nOT {
INJECT_OT_COMMON_USING_NAMESPACE_COMMON_1; // <=== namespaces
INJECT_OT_COMMON_USING_NAMESPACE_COMMON_1 // <=== namespaces
/** Global options to run this program main() Eg used for developer's special options like +setdemo +setdebug.
This is NOT for all the other options that are parsed and executed by program. */
@ -50,7 +50,7 @@ class cRunOptions {
extern cRunOptions gRunOptions;
}; // namespace nOT
} // namespace nOT

View file

@ -26,8 +26,7 @@
#elif defined(__unix__) || defined(__posix) || defined(__linux) || defined(__darwin) || defined(__APPLE__) || defined(__clang__)
#define OS_TYPE_POSIX
#else
#warning "Compiler/OS platform is not recognized"
#warning "Just assuming it will work as POSIX then"
#warning "Compiler/OS platform is not recognized. Just assuming it will work as POSIX then"
#define OS_TYPE_POSIX
#endif
@ -44,7 +43,7 @@
namespace nOT {
namespace nUtils {
INJECT_OT_COMMON_USING_NAMESPACE_COMMON_1; // <=== namespaces
INJECT_OT_COMMON_USING_NAMESPACE_COMMON_1 // <=== namespaces
myexception::myexception(const char * what)
: std::runtime_error(what)
@ -78,26 +77,37 @@ std::string & trim(std::string &s) {
return ltrim(rtrim(s));
}
std::string get_current_time()
{
std::string get_current_time() {
std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
time_t time_now = std::chrono::system_clock::to_time_t(now);
std::chrono::high_resolution_clock::duration duration = now.time_since_epoch();
int64_t micro = std::chrono::duration_cast<std::chrono::microseconds>(duration).count();
// std::localtime() - This function may not be thread-safe.
#ifdef OS_TYPE_WINDOWS
struct tm * tm_pointer = std::localtime( &time_now ); // thread-safe on mingw-w64 (thread local variable) and on MSVC btw
// http://stackoverflow.com/questions/18551409/localtime-r-support-on-mingw
// tm_pointer points to thread-local data, memory is owned/managed by the system/library
#else
// linux, freebsd, have this
struct tm tm_object; // automatic storage duration http://en.cppreference.com/w/cpp/language/storage_duration
struct tm * tm_pointer = & tm_object; // just point to our data
auto x = localtime_r( &time_now , tm_pointer ); // modifies our own (this thread) data in tm_object, this is safe http://linux.die.net/man/3/localtime_r
if (x != tm_pointer) return "(internal error in get_current_time)"; // redundant check in case of broken implementation of localtime_r
#endif
// tm_pointer now points to proper time data, and that memory is automatically managed
if (!tm_pointer) return "(internal error in get_current_time - NULL)"; // redundant check in case of broken implementation of used library methods
std::stringstream stream;
struct tm * date;
std::chrono::high_resolution_clock::time_point now = std::chrono::high_resolution_clock::now();
time_t time_now;
time_now = std::chrono::high_resolution_clock::to_time_t(now);
date = std::localtime(& time_now);
char date_buff[32];
std::strftime(date_buff, sizeof(date_buff), "%d-%b-%Y %H:%M:%S.", date);
stream << date_buff;
std::chrono::high_resolution_clock::duration duration = now.time_since_epoch();
int64_t micro = std::chrono::duration_cast<std::chrono::microseconds>(duration).count();
micro %= 1000000;
stream << std::setfill('0') << std::setw(3) << micro;
return stream.str();
stream << std::setfill('0')
<< std::setw(2) << tm_pointer->tm_year+1900
<< '-' << std::setw(2) << tm_pointer->tm_mon+1
<< '-' << std::setw(2) << tm_pointer->tm_mday
<< ' ' << std::setw(2) << tm_pointer->tm_hour
<< ':' << std::setw(2) << tm_pointer->tm_min
<< ':' << std::setw(2) << tm_pointer->tm_sec
<< '.' << std::setw(6) << (micro%1000000); // 6 because microseconds
return stream.str();
}
cNullstream g_nullstream; // extern a stream that does nothing (eats/discards data)
@ -213,7 +223,7 @@ void cDebugScopeGuard::Assign(const string &chan, const int level, const string
mMsg=msg;
}
}; // namespace nDetail
} // namespace nDetail
// ====================================================================
@ -591,10 +601,10 @@ string stringToColor(const string &hash) {
// algorthms
}; // namespace nUtil
} // namespace nUtil
}; // namespace OT
} // namespace OT
// global namespace

View file

@ -14,7 +14,7 @@
#endif
#ifndef CFG_WITH_TERMCOLORS
#error "You requested to turn off terminal colors (CFG_WITH_TERMCOLORS), however currently they are hardcoded (this option to turn them off is not yet implemented)."
//#error "You requested to turn off terminal colors (CFG_WITH_TERMCOLORS), however currently they are hardcoded (this option to turn them off is not yet implemented)."
#endif
///Macros related to automatic deduction of class name etc;
@ -35,7 +35,7 @@ class myexception : public std::runtime_error {
};
/// @macro Use this macro INJECT_OT_COMMON_USING_NAMESPACE_COMMON_1 as a shortcut for various using std::string etc.
INJECT_OT_COMMON_USING_NAMESPACE_COMMON_1; // <=== namespaces
INJECT_OT_COMMON_USING_NAMESPACE_COMMON_1 // <=== namespaces
// ======================================================================================
/// text trimming functions (they do mutate the passes string); they trim based on std::isspace. also return it's reference again
@ -87,6 +87,7 @@ extern std::mutex gLoggerGuard;
#define _warn(VAR) _debug_level( 90,VAR) // some problem
#define _erro(VAR) _debug_level(100,VAR) // error - report
#define _dbg3_c(C,VAR) _debug_level_c(C, 20,VAR)
#define _dbg2_c(C,VAR) _debug_level_c(C, 30,VAR)
#define _dbg1_c(C,VAR) _debug_level_c(C, 40,VAR) // details
@ -140,7 +141,7 @@ class cDebugScopeGuard {
const char* DbgShortenCodeFileName(const char *s); ///< Returns a pointer to some part of the string that was given, skipping directory names, for log/debug
}; // namespace nDetail
} // namespace nDetail
// ========== logger ==========
@ -423,9 +424,9 @@ class value_init {
template <class T, T INIT>
value_init<T, INIT>::value_init() : data(INIT) { }
}; // namespace nUtils
} // namespace nUtils
}; // namespace nOT
} // namespace nOT
// global namespace