/** @file @author from CrypoNote (see copyright below; Andrey N. Sabelnikov) @monero rfree @brief the connection templated-class for one peer connection */ // Copyright (c) 2006-2013, Andrey N. Sabelnikov, www.sabelnikov.net // All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are met: // * Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // * Redistributions in binary form must reproduce the above copyright // notice, this list of conditions and the following disclaimer in the // documentation and/or other materials provided with the distribution. // * Neither the name of the Andrey N. Sabelnikov nor the // names of its contributors may be used to endorse or promote products // derived from this software without specific prior written permission. // // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER BE LIABLE FOR ANY // DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // #include #include #include #include #include #include // TODO #include // TODO #include #include #include "warnings.h" #include "string_tools_lexical.h" #include "misc_language.h" #include #include #include #include #include #undef MONERO_DEFAULT_LOG_CATEGORY #define MONERO_DEFAULT_LOG_CATEGORY "net" #define AGGRESSIVE_TIMEOUT_THRESHOLD 120 // sockets #define NEW_CONNECTION_TIMEOUT_LOCAL 1200000 // 2 minutes #define NEW_CONNECTION_TIMEOUT_REMOTE 10000 // 10 seconds #define DEFAULT_TIMEOUT_MS_LOCAL 1800000 // 30 minutes #define DEFAULT_TIMEOUT_MS_REMOTE 300000 // 5 minutes #define TIMEOUT_EXTRA_MS_PER_BYTE 0.2 namespace epee { namespace net_utils { template T& check_and_get(std::shared_ptr& ptr) { CHECK_AND_ASSERT_THROW_MES(bool(ptr), "shared_state cannot be null"); return *ptr; } /************************************************************************/ /* */ /************************************************************************/ template unsigned int connection::host_count(int delta) { static lock_t hosts_mutex; lock_guard_t guard(hosts_mutex); static std::map hosts; unsigned int &val = hosts[host]; if (delta > 0) MTRACE("New connection from host " << host << ": " << val); else if (delta < 0) MTRACE("Closed connection from host " << host << ": " << val); CHECK_AND_ASSERT_THROW_MES(delta >= 0 || val >= (unsigned)-delta, "Count would go negative"); CHECK_AND_ASSERT_THROW_MES(delta <= 0 || val <= std::numeric_limits::max() - (unsigned)delta, "Count would wrap"); val += delta; return val; } template typename connection::duration_t connection::get_default_timeout() { unsigned count{}; try { count = host_count(); } catch (...) {} const unsigned shift = ( connection_basic::get_state().sock_count > AGGRESSIVE_TIMEOUT_THRESHOLD ? std::min(std::max(count, 1u) - 1, 8u) : 0 ); return ( local ? std::chrono::milliseconds(DEFAULT_TIMEOUT_MS_LOCAL >> shift) : std::chrono::milliseconds(DEFAULT_TIMEOUT_MS_REMOTE >> shift) ); } template typename connection::duration_t connection::get_timeout_from_bytes_read(size_t bytes) const { return std::chrono::duration_cast::duration_t>( std::chrono::duration( bytes * TIMEOUT_EXTRA_MS_PER_BYTE ) ); } template void connection::start_timer(duration_t duration, bool add) { if (state.timers.general.wait_expire) { state.timers.general.cancel_expire = true; state.timers.general.reset_expire = true; ec_t ec; timers.general.expires_from_now( std::min( duration + (add ? timers.general.expires_from_now() : duration_t{}), get_default_timeout() ), ec ); } else { ec_t ec; timers.general.expires_from_now( std::min( duration + (add ? timers.general.expires_from_now() : duration_t{}), get_default_timeout() ), ec ); async_wait_timer(); } } template void connection::async_wait_timer() { if (state.timers.general.wait_expire) return; state.timers.general.wait_expire = true; auto self = connection::shared_from_this(); timers.general.async_wait([this, self](const ec_t & ec){ lock_guard_t guard(state.lock); state.timers.general.wait_expire = false; if (state.timers.general.cancel_expire) { state.timers.general.cancel_expire = false; if (state.timers.general.reset_expire) { state.timers.general.reset_expire = false; async_wait_timer(); } else if (state.status == status_t::INTERRUPTED) on_interrupted(); else if (state.status == status_t::TERMINATING) on_terminating(); } else if (state.status == status_t::RUNNING) interrupt(); else if (state.status == status_t::INTERRUPTED) terminate(); }); } template void connection::cancel_timer() { if (not state.timers.general.wait_expire) return; state.timers.general.cancel_expire = true; state.timers.general.reset_expire = false; ec_t ec; timers.general.cancel(ec); } template void connection::start_handshake() { if (state.socket.wait_handshake) return; static_assert( epee::net_utils::get_ssl_magic_size() <= sizeof(state.data.read.buffer), "" ); auto self = connection::shared_from_this(); if (not state.ssl.forced and not state.ssl.detected) { state.socket.wait_read = true; boost::asio::async_read( connection_basic::socket_.next_layer(), boost::asio::buffer( state.data.read.buffer.data(), state.data.read.buffer.size() ), boost::asio::transfer_exactly(epee::net_utils::get_ssl_magic_size()), strand.wrap( [this, self](const ec_t &ec, size_t bytes_transferred){ lock_guard_t guard(state.lock); state.socket.wait_read = false; if (state.socket.cancel_read) { state.socket.cancel_read = false; if (state.status == status_t::RUNNING) interrupt(); else if (state.status == status_t::INTERRUPTED) on_interrupted(); else if (state.status == status_t::TERMINATING) on_terminating(); } else if (ec.value()) { terminate(); } else if ( not epee::net_utils::is_ssl( static_cast( state.data.read.buffer.data() ), bytes_transferred ) ) { state.ssl.enabled = false; state.socket.handle_read = true; connection_basic::strand_.post( [this, self, bytes_transferred]{ bool success = handler.handle_recv( reinterpret_cast(state.data.read.buffer.data()), bytes_transferred ); lock_guard_t guard(state.lock); state.socket.handle_read = false; if (state.status == status_t::INTERRUPTED) on_interrupted(); else if (state.status == status_t::TERMINATING) on_terminating(); else if (not success) interrupt(); else { start_read(); } } ); } else { state.ssl.detected = true; start_handshake(); } } ) ); return; } state.socket.wait_handshake = true; auto on_handshake = [this, self](const ec_t &ec, size_t bytes_transferred){ lock_guard_t guard(state.lock); state.socket.wait_handshake = false; if (state.socket.cancel_handshake) { state.socket.cancel_handshake = false; if (state.status == status_t::RUNNING) interrupt(); else if (state.status == status_t::INTERRUPTED) on_interrupted(); else if (state.status == status_t::TERMINATING) on_terminating(); } else if (ec.value()) { ec_t ec; connection_basic::socket_.next_layer().shutdown( socket_t::shutdown_both, ec ); connection_basic::socket_.next_layer().close(ec); state.socket.connected = false; interrupt(); } else { state.ssl.handshaked = true; start_write(); start_read(); } }; const auto handshake = handshake_t::server; static_cast( connection_basic::get_state() ).ssl_options().configure(connection_basic::socket_, handshake); strand.post( [this, self, on_handshake]{ connection_basic::socket_.async_handshake( handshake, boost::asio::buffer( state.data.read.buffer.data(), state.ssl.forced ? 0 : epee::net_utils::get_ssl_magic_size() ), strand.wrap(on_handshake) ); } ); } template void connection::start_read() { if (state.timers.throttle.in.wait_expire || state.socket.wait_read || state.socket.handle_read ) return; auto self = connection::shared_from_this(); if (connection_type != e_connection_type_RPC) { auto calc_duration = []{ CRITICAL_REGION_LOCAL( network_throttle_manager_t::m_lock_get_global_throttle_in ); return std::chrono::duration_cast::duration_t>( std::chrono::duration( std::min( network_throttle_manager_t::get_global_throttle_in( ).get_sleep_time_after_tick(1), 1.0 ) ) ); }; const auto duration = calc_duration(); if (duration > duration_t{}) { ec_t ec; timers.throttle.in.expires_from_now(duration, ec); state.timers.throttle.in.wait_expire = true; timers.throttle.in.async_wait([this, self](const ec_t &ec){ lock_guard_t guard(state.lock); state.timers.throttle.in.wait_expire = false; if (state.timers.throttle.in.cancel_expire) { state.timers.throttle.in.cancel_expire = false; if (state.status == status_t::RUNNING) interrupt(); else if (state.status == status_t::INTERRUPTED) on_interrupted(); else if (state.status == status_t::TERMINATING) on_terminating(); } else if (ec.value()) interrupt(); else start_read(); }); return; } } state.socket.wait_read = true; auto on_read = [this, self](const ec_t &ec, size_t bytes_transferred){ lock_guard_t guard(state.lock); state.socket.wait_read = false; if (state.socket.cancel_read) { state.socket.cancel_read = false; if (state.status == status_t::RUNNING) interrupt(); else if (state.status == status_t::INTERRUPTED) on_interrupted(); else if (state.status == status_t::TERMINATING) on_terminating(); } else if (ec.value()) terminate(); else { { state.stat.in.throttle.handle_trafic_exact(bytes_transferred); const auto speed = state.stat.in.throttle.get_current_speed(); context.m_current_speed_down = speed; context.m_max_speed_down = std::max( context.m_max_speed_down, speed ); { CRITICAL_REGION_LOCAL( network_throttle_manager_t::m_lock_get_global_throttle_in ); network_throttle_manager_t::get_global_throttle_in( ).handle_trafic_exact(bytes_transferred); } connection_basic::logger_handle_net_read(bytes_transferred); context.m_last_recv = time(NULL); context.m_recv_cnt += bytes_transferred; start_timer(get_timeout_from_bytes_read(bytes_transferred), true); } state.socket.handle_read = true; connection_basic::strand_.post( [this, self, bytes_transferred]{ bool success = handler.handle_recv( reinterpret_cast(state.data.read.buffer.data()), bytes_transferred ); lock_guard_t guard(state.lock); state.socket.handle_read = false; if (state.status == status_t::INTERRUPTED) on_interrupted(); else if (state.status == status_t::TERMINATING) on_terminating(); else if (not success) interrupt(); else { start_read(); } } ); } }; if (not state.ssl.enabled) connection_basic::socket_.next_layer().async_read_some( boost::asio::buffer( state.data.read.buffer.data(), state.data.read.buffer.size() ), strand.wrap(on_read) ); else strand.post( [this, self, on_read]{ connection_basic::socket_.async_read_some( boost::asio::buffer( state.data.read.buffer.data(), state.data.read.buffer.size() ), strand.wrap(on_read) ); } ); } template void connection::start_write() { if (state.timers.throttle.out.wait_expire || state.socket.wait_write || state.data.write.queue.empty() || (state.ssl.enabled && not state.ssl.handshaked) ) return; auto self = connection::shared_from_this(); if (connection_type != e_connection_type_RPC) { auto calc_duration = [this]{ CRITICAL_REGION_LOCAL( network_throttle_manager_t::m_lock_get_global_throttle_out ); return std::chrono::duration_cast::duration_t>( std::chrono::duration( std::min( network_throttle_manager_t::get_global_throttle_out( ).get_sleep_time_after_tick( state.data.write.queue.back().size() ), 1.0 ) ) ); }; const auto duration = calc_duration(); if (duration > duration_t{}) { ec_t ec; timers.throttle.out.expires_from_now(duration, ec); state.timers.throttle.out.wait_expire = true; timers.throttle.out.async_wait([this, self](const ec_t &ec){ lock_guard_t guard(state.lock); state.timers.throttle.out.wait_expire = false; if (state.timers.throttle.out.cancel_expire) { state.timers.throttle.out.cancel_expire = false; if (state.status == status_t::RUNNING) interrupt(); else if (state.status == status_t::INTERRUPTED) on_interrupted(); else if (state.status == status_t::TERMINATING) on_terminating(); } else if (ec.value()) interrupt(); else start_write(); }); } } state.socket.wait_write = true; auto on_write = [this, self](const ec_t &ec, size_t bytes_transferred){ lock_guard_t guard(state.lock); state.socket.wait_write = false; if (state.socket.cancel_write) { state.socket.cancel_write = false; state.data.write.queue.clear(); if (state.status == status_t::RUNNING) interrupt(); else if (state.status == status_t::INTERRUPTED) on_interrupted(); else if (state.status == status_t::TERMINATING) on_terminating(); } else if (ec.value()) { state.data.write.queue.clear(); interrupt(); } else { { state.stat.out.throttle.handle_trafic_exact(bytes_transferred); const auto speed = state.stat.out.throttle.get_current_speed(); context.m_current_speed_up = speed; context.m_max_speed_down = std::max( context.m_max_speed_down, speed ); { CRITICAL_REGION_LOCAL( network_throttle_manager_t::m_lock_get_global_throttle_out ); network_throttle_manager_t::get_global_throttle_out( ).handle_trafic_exact(bytes_transferred); } connection_basic::logger_handle_net_write(bytes_transferred); context.m_last_send = time(NULL); context.m_send_cnt += bytes_transferred; start_timer(get_default_timeout(), true); } assert(bytes_transferred == state.data.write.queue.back().size()); state.data.write.queue.pop_back(); state.condition.notify_all(); start_write(); } }; if (not state.ssl.enabled) boost::asio::async_write( connection_basic::socket_.next_layer(), boost::asio::buffer( state.data.write.queue.back().data(), state.data.write.queue.back().size() ), strand.wrap(on_write) ); else strand.post( [this, self, on_write]{ boost::asio::async_write( connection_basic::socket_, boost::asio::buffer( state.data.write.queue.back().data(), state.data.write.queue.back().size() ), strand.wrap(on_write) ); } ); } template void connection::start_shutdown() { if (state.socket.wait_shutdown) return; auto self = connection::shared_from_this(); state.socket.wait_shutdown = true; auto on_shutdown = [this, self](const ec_t &ec){ lock_guard_t guard(state.lock); state.socket.wait_shutdown = false; if (state.socket.cancel_shutdown) { state.socket.cancel_shutdown = false; if (state.status == status_t::RUNNING) interrupt(); else if (state.status == status_t::INTERRUPTED) terminate(); else if (state.status == status_t::TERMINATING) on_terminating(); } else if (ec.value()) terminate(); else { cancel_timer(); on_interrupted(); } }; strand.post( [this, self, on_shutdown]{ connection_basic::socket_.async_shutdown( strand.wrap(on_shutdown) ); } ); start_timer(get_default_timeout()); } template void connection::cancel_socket() { bool wait_socket = false; if (state.socket.wait_handshake) wait_socket = state.socket.cancel_handshake = true; if (state.timers.throttle.in.wait_expire) { state.timers.throttle.in.cancel_expire = true; ec_t ec; timers.throttle.in.cancel(ec); } if (state.socket.wait_read) wait_socket = state.socket.cancel_read = true; if (state.timers.throttle.out.wait_expire) { state.timers.throttle.out.cancel_expire = true; ec_t ec; timers.throttle.out.cancel(ec); } if (state.socket.wait_write) wait_socket = state.socket.cancel_write = true; if (state.socket.wait_shutdown) wait_socket = state.socket.cancel_shutdown = true; if (wait_socket) { ec_t ec; connection_basic::socket_.next_layer().cancel(ec); } } template void connection::cancel_handler() { if (state.protocol.released || state.protocol.wait_release) return; state.protocol.wait_release = true; state.lock.unlock(); handler.release_protocol(); state.lock.lock(); state.protocol.wait_release = false; state.protocol.released = true; if (state.status == status_t::INTERRUPTED) on_interrupted(); else if (state.status == status_t::TERMINATING) on_terminating(); } template void connection::interrupt() { if (state.status != status_t::RUNNING) return; state.status = status_t::INTERRUPTED; cancel_timer(); cancel_socket(); on_interrupted(); state.condition.notify_all(); cancel_handler(); } template void connection::on_interrupted() { assert(state.status == status_t::INTERRUPTED); if (state.timers.general.wait_expire) return; if (state.socket.wait_handshake) return; if (state.timers.throttle.in.wait_expire) return; if (state.socket.wait_read) return; if (state.socket.handle_read) return; if (state.timers.throttle.out.wait_expire) return; if (state.socket.wait_write) return; if (state.socket.wait_shutdown) return; if (state.protocol.wait_init) return; if (state.protocol.wait_callback) return; if (state.protocol.wait_release) return; if (state.socket.connected) { if (not state.ssl.enabled) { ec_t ec; connection_basic::socket_.next_layer().shutdown( socket_t::shutdown_both, ec ); connection_basic::socket_.next_layer().close(ec); state.socket.connected = false; state.status = status_t::WASTED; } else start_shutdown(); } else state.status = status_t::WASTED; } template void connection::terminate() { if (state.status != status_t::RUNNING && state.status != status_t::INTERRUPTED ) return; state.status = status_t::TERMINATING; cancel_timer(); cancel_socket(); on_terminating(); state.condition.notify_all(); cancel_handler(); } template void connection::on_terminating() { assert(state.status == status_t::TERMINATING); if (state.timers.general.wait_expire) return; if (state.socket.wait_handshake) return; if (state.timers.throttle.in.wait_expire) return; if (state.socket.wait_read) return; if (state.socket.handle_read) return; if (state.timers.throttle.out.wait_expire) return; if (state.socket.wait_write) return; if (state.socket.wait_shutdown) return; if (state.protocol.wait_init) return; if (state.protocol.wait_callback) return; if (state.protocol.wait_release) return; if (state.socket.connected) { ec_t ec; connection_basic::socket_.next_layer().shutdown( socket_t::shutdown_both, ec ); connection_basic::socket_.next_layer().close(ec); state.socket.connected = false; } state.status = status_t::WASTED; } template bool connection::send(byte_slice_t message) { lock_guard_t guard(state.lock); if (state.status != status_t::RUNNING || state.socket.wait_handshake) return false; auto wait_consume = [this] { auto random_delay = []{ using engine = std::mt19937; std::random_device dev; std::seed_seq::result_type rand[ engine::state_size // Use complete bit space ]{}; std::generate_n(rand, engine::state_size, std::ref(dev)); std::seed_seq seed(rand, rand + engine::state_size); engine rng(seed); return std::chrono::milliseconds( std::uniform_int_distribution<>(5000, 6000)(rng) ); }; if (state.data.write.queue.size() <= ABSTRACT_SERVER_SEND_QUE_MAX_COUNT) return true; state.data.write.wait_consume = true; bool success = state.condition.wait_for( state.lock, random_delay(), [this]{ return ( state.status != status_t::RUNNING || state.data.write.queue.size() <= ABSTRACT_SERVER_SEND_QUE_MAX_COUNT ); } ); state.data.write.wait_consume = false; if (not success) { terminate(); return false; } else return state.status == status_t::RUNNING; }; auto wait_sender = [this] { state.condition.wait( state.lock, [this] { return ( state.status != status_t::RUNNING || not state.data.write.wait_consume ); } ); return state.status == status_t::RUNNING; }; if (not wait_sender()) return false; constexpr size_t CHUNK_SIZE = 32 * 1024; if (connection_type == e_connection_type_RPC || message.size() <= 2 * CHUNK_SIZE ) { if (not wait_consume()) return false; state.data.write.queue.emplace_front(std::move(message)); start_write(); } else { while (!message.empty()) { if (not wait_consume()) return false; state.data.write.queue.emplace_front( message.take_slice(CHUNK_SIZE) ); start_write(); } } state.condition.notify_all(); return true; } template bool connection::start_internal( bool is_income, bool is_multithreaded, boost::optional real_remote ) { unique_lock_t guard(state.lock); if (state.status != status_t::TERMINATED) return false; if (not real_remote) { ec_t ec; auto endpoint = connection_basic::socket_.next_layer().remote_endpoint( ec ); if (ec.value()) return false; real_remote = ( endpoint.address().is_v6() ? network_address{ ipv6_network_address{endpoint.address().to_v6(), endpoint.port()} } : network_address{ ipv4_network_address{ uint32_t{ boost::asio::detail::socket_ops::host_to_network_long( endpoint.address().to_v4().to_ulong() ) }, endpoint.port() } } ); } auto *filter = static_cast( connection_basic::get_state() ).pfilter; if (filter and not filter->is_remote_host_allowed(*real_remote)) return false; ec_t ec; #if !defined(_WIN32) || !defined(__i686) connection_basic::socket_.next_layer().set_option( boost::asio::detail::socket_option::integer{ connection_basic::get_tos_flag() }, ec ); if (ec.value()) return false; #endif connection_basic::socket_.next_layer().set_option( boost::asio::ip::tcp::no_delay{false}, ec ); if (ec.value()) return false; connection_basic::m_is_multithreaded = is_multithreaded; context.set_details( boost::uuids::random_generator()(), *real_remote, is_income, connection_basic::m_ssl_support == ssl_support_t::e_ssl_support_enabled ); host = real_remote->host_str(); try { host_count(1); } catch(...) { /* ignore */ } local = real_remote->is_loopback() || real_remote->is_local(); state.ssl.enabled = ( connection_basic::m_ssl_support != ssl_support_t::e_ssl_support_disabled ); state.ssl.forced = ( connection_basic::m_ssl_support == ssl_support_t::e_ssl_support_enabled ); state.socket.connected = true; state.status = status_t::RUNNING; start_timer( std::chrono::milliseconds( local ? NEW_CONNECTION_TIMEOUT_LOCAL : NEW_CONNECTION_TIMEOUT_REMOTE ) ); state.protocol.wait_init = true; guard.unlock(); handler.after_init_connection(); guard.lock(); state.protocol.wait_init = false; state.protocol.initialized = true; if (state.status == status_t::INTERRUPTED) on_interrupted(); else if (state.status == status_t::TERMINATING) on_terminating(); else if (not is_income || not state.ssl.enabled) start_read(); else start_handshake(); return true; } template connection::connection( io_context_t &io_context, std::shared_ptr shared_state, t_connection_type connection_type, ssl_support_t ssl_support ): connection( std::move(socket_t{io_context}), std::move(shared_state), connection_type, ssl_support ) { } template connection::connection( socket_t &&socket, std::shared_ptr shared_state, t_connection_type connection_type, ssl_support_t ssl_support ): connection_basic(std::move(socket), shared_state, ssl_support), handler(this, *shared_state, context), connection_type(connection_type), io_context{GET_IO_SERVICE(connection_basic::socket_)}, strand{io_context}, timers{io_context} { } template connection::~connection() noexcept(false) { lock_guard_t guard(state.lock); assert(state.status == status_t::TERMINATED || state.status == status_t::WASTED || io_context.stopped() ); if (state.status != status_t::WASTED) return; try { host_count(-1); } catch (...) { /* ignore */ } } template bool connection::start( bool is_income, bool is_multithreaded ) { return start_internal(is_income, is_multithreaded, {}); } template bool connection::start( bool is_income, bool is_multithreaded, network_address real_remote ) { return start_internal(is_income, is_multithreaded, real_remote); } template void connection::save_dbg_log() { lock_guard_t guard(state.lock); string_t address; string_t port; ec_t ec; auto endpoint = connection_basic::socket().remote_endpoint(ec); if (ec.value()) { address = ""; port = ""; } else { address = endpoint.address().to_string(); port = std::to_string(endpoint.port()); } MDEBUG( " connection type " << std::to_string(connection_type) << " " << connection_basic::socket().local_endpoint().address().to_string() << ":" << connection_basic::socket().local_endpoint().port() << " <--> " << context.m_remote_address.str() << " (via " << address << ":" << port << ")" ); } template bool connection::speed_limit_is_enabled() const { return connection_type != e_connection_type_RPC; } template bool connection::cancel() { return close(); } template bool connection::do_send(byte_slice message) { return send(std::move(message)); } template bool connection::send_done() { return true; } template bool connection::close() { lock_guard_t guard(state.lock); if (state.status != status_t::RUNNING) return false; terminate(); return true; } template bool connection::call_run_once_service_io() { if(connection_basic::m_is_multithreaded) { if (not io_context.poll_one()) misc_utils::sleep_no_w(1); } else { if (!io_context.run_one()) return false; } return true; } template bool connection::request_callback() { lock_guard_t guard(state.lock); if (state.status != status_t::RUNNING) return false; auto self = connection::shared_from_this(); ++state.protocol.wait_callback; connection_basic::strand_.post([this, self]{ handler.handle_qued_callback(); lock_guard_t guard(state.lock); --state.protocol.wait_callback; if (state.status == status_t::INTERRUPTED) on_interrupted(); else if (state.status == status_t::TERMINATING) on_terminating(); }); return true; } template typename connection::io_context_t &connection::get_io_service() { return io_context; } template bool connection::add_ref() { try { auto self = connection::shared_from_this(); lock_guard_t guard(state.lock); this->self = std::move(self); ++state.protocol.reference_counter; return true; } catch (boost::bad_weak_ptr &exception) { return false; } } template bool connection::release() { connection_ptr self; lock_guard_t guard(state.lock); if (not --state.protocol.reference_counter) self = std::move(this->self); return true; } template void connection::setRpcStation() { lock_guard_t guard(state.lock); connection_type = e_connection_type_RPC; } template boosted_tcp_server::boosted_tcp_server( t_connection_type connection_type ) : m_state(std::make_shared::shared_state>()), m_io_service_local_instance(new worker()), io_service_(m_io_service_local_instance->io_service), acceptor_(io_service_), acceptor_ipv6(io_service_), default_remote(), m_stop_signal_sent(false), m_port(0), m_threads_count(0), m_thread_index(0), m_connection_type( connection_type ), new_connection_(), new_connection_ipv6() { create_server_type_map(); m_thread_name_prefix = "NET"; } template boosted_tcp_server::boosted_tcp_server(boost::asio::io_service& extarnal_io_service, t_connection_type connection_type) : m_state(std::make_shared::shared_state>()), io_service_(extarnal_io_service), acceptor_(io_service_), acceptor_ipv6(io_service_), default_remote(), m_stop_signal_sent(false), m_port(0), m_threads_count(0), m_thread_index(0), m_connection_type(connection_type), new_connection_(), new_connection_ipv6() { create_server_type_map(); m_thread_name_prefix = "NET"; } //--------------------------------------------------------------------------------- template boosted_tcp_server::~boosted_tcp_server() { this->send_stop_signal(); timed_wait_server_stop(10000); } //--------------------------------------------------------------------------------- template void boosted_tcp_server::create_server_type_map() { server_type_map["NET"] = e_connection_type_NET; server_type_map["RPC"] = e_connection_type_RPC; server_type_map["P2P"] = e_connection_type_P2P; } //--------------------------------------------------------------------------------- template bool boosted_tcp_server::init_server(uint32_t port, const std::string& address, uint32_t port_ipv6, const std::string& address_ipv6, bool use_ipv6, bool require_ipv4, ssl_options_t ssl_options) { TRY_ENTRY(); m_stop_signal_sent = false; m_port = port; m_port_ipv6 = port_ipv6; m_address = address; m_address_ipv6 = address_ipv6; m_use_ipv6 = use_ipv6; m_require_ipv4 = require_ipv4; if (ssl_options) m_state->configure_ssl(std::move(ssl_options)); std::string ipv4_failed = ""; std::string ipv6_failed = ""; try { boost::asio::ip::tcp::resolver resolver(io_service_); boost::asio::ip::tcp::resolver::query query(address, boost::lexical_cast(port), boost::asio::ip::tcp::resolver::query::canonical_name); boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query); acceptor_.open(endpoint.protocol()); #if !defined(_WIN32) acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); #endif acceptor_.bind(endpoint); acceptor_.listen(); boost::asio::ip::tcp::endpoint binded_endpoint = acceptor_.local_endpoint(); m_port = binded_endpoint.port(); MDEBUG("start accept (IPv4)"); new_connection_.reset(new connection(io_service_, m_state, m_connection_type, m_state->ssl_options().support)); acceptor_.async_accept(new_connection_->socket(), boost::bind(&boosted_tcp_server::handle_accept_ipv4, this, boost::asio::placeholders::error)); } catch (const std::exception &e) { ipv4_failed = e.what(); } if (ipv4_failed != "") { MERROR("Failed to bind IPv4: " << ipv4_failed); if (require_ipv4) { throw std::runtime_error("Failed to bind IPv4 (set to required)"); } } if (use_ipv6) { try { if (port_ipv6 == 0) port_ipv6 = port; // default arg means bind to same port as ipv4 boost::asio::ip::tcp::resolver resolver(io_service_); boost::asio::ip::tcp::resolver::query query(address_ipv6, boost::lexical_cast(port_ipv6), boost::asio::ip::tcp::resolver::query::canonical_name); boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query); acceptor_ipv6.open(endpoint.protocol()); #if !defined(_WIN32) acceptor_ipv6.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); #endif acceptor_ipv6.set_option(boost::asio::ip::v6_only(true)); acceptor_ipv6.bind(endpoint); acceptor_ipv6.listen(); boost::asio::ip::tcp::endpoint binded_endpoint = acceptor_ipv6.local_endpoint(); m_port_ipv6 = binded_endpoint.port(); MDEBUG("start accept (IPv6)"); new_connection_ipv6.reset(new connection(io_service_, m_state, m_connection_type, m_state->ssl_options().support)); acceptor_ipv6.async_accept(new_connection_ipv6->socket(), boost::bind(&boosted_tcp_server::handle_accept_ipv6, this, boost::asio::placeholders::error)); } catch (const std::exception &e) { ipv6_failed = e.what(); } } if (use_ipv6 && ipv6_failed != "") { MERROR("Failed to bind IPv6: " << ipv6_failed); if (ipv4_failed != "") { throw std::runtime_error("Failed to bind IPv4 and IPv6"); } } return true; } catch (const std::exception &e) { MFATAL("Error starting server: " << e.what()); return false; } catch (...) { MFATAL("Error starting server"); return false; } } //----------------------------------------------------------------------------- template bool boosted_tcp_server::init_server(const std::string port, const std::string& address, const std::string port_ipv6, const std::string address_ipv6, bool use_ipv6, bool require_ipv4, ssl_options_t ssl_options) { uint32_t p = 0; uint32_t p_ipv6 = 0; if (port.size() && !string_tools::get_xtype_from_string(p, port)) { MERROR("Failed to convert port no = " << port); return false; } if (port_ipv6.size() && !string_tools::get_xtype_from_string(p_ipv6, port_ipv6)) { MERROR("Failed to convert port no = " << port_ipv6); return false; } return this->init_server(p, address, p_ipv6, address_ipv6, use_ipv6, require_ipv4, std::move(ssl_options)); } //--------------------------------------------------------------------------------- template bool boosted_tcp_server::worker_thread() { TRY_ENTRY(); const uint32_t local_thr_index = m_thread_index++; // atomically increment, getting value before increment std::string thread_name = std::string("[") + m_thread_name_prefix; thread_name += boost::to_string(local_thr_index) + "]"; MLOG_SET_THREAD_NAME(thread_name); // _fact("Thread name: " << m_thread_name_prefix); while(!m_stop_signal_sent) { try { io_service_.run(); return true; } catch(const std::exception& ex) { _erro("Exception at server worker thread, what=" << ex.what()); } catch(...) { _erro("Exception at server worker thread, unknown execption"); } } //_info("Worker thread finished"); return true; CATCH_ENTRY_L0("boosted_tcp_server::worker_thread", false); } //--------------------------------------------------------------------------------- template void boosted_tcp_server::set_threads_prefix(const std::string& prefix_name) { m_thread_name_prefix = prefix_name; auto it = server_type_map.find(m_thread_name_prefix); if (it==server_type_map.end()) throw std::runtime_error("Unknown prefix/server type:" + std::string(prefix_name)); auto connection_type = it->second; // the value of type MINFO("Set server type to: " << connection_type << " from name: " << m_thread_name_prefix << ", prefix_name = " << prefix_name); } //--------------------------------------------------------------------------------- template void boosted_tcp_server::set_connection_filter(i_connection_filter* pfilter) { assert(m_state != nullptr); // always set in constructor m_state->pfilter = pfilter; } //--------------------------------------------------------------------------------- template bool boosted_tcp_server::run_server(size_t threads_count, bool wait, const boost::thread::attributes& attrs) { TRY_ENTRY(); m_threads_count = threads_count; m_main_thread_id = boost::this_thread::get_id(); MLOG_SET_THREAD_NAME("[SRV_MAIN]"); while(!m_stop_signal_sent) { // Create a pool of threads to run all of the io_services. CRITICAL_REGION_BEGIN(m_threads_lock); for (std::size_t i = 0; i < threads_count; ++i) { boost::shared_ptr thread(new boost::thread( attrs, boost::bind(&boosted_tcp_server::worker_thread, this))); _note("Run server thread name: " << m_thread_name_prefix); m_threads.push_back(thread); } CRITICAL_REGION_END(); // Wait for all threads in the pool to exit. if (wait) { _fact("JOINING all threads"); for (std::size_t i = 0; i < m_threads.size(); ++i) { m_threads[i]->join(); } _fact("JOINING all threads - almost"); m_threads.clear(); _fact("JOINING all threads - DONE"); } else { _dbg1("Reiniting OK."); return true; } if(wait && !m_stop_signal_sent) { //some problems with the listening socket ?.. _dbg1("Net service stopped without stop request, restarting..."); if(!this->init_server(m_port, m_address, m_port_ipv6, m_address_ipv6, m_use_ipv6, m_require_ipv4)) { _dbg1("Reiniting service failed, exit."); return false; }else { _dbg1("Reiniting OK."); } } } return true; CATCH_ENTRY_L0("boosted_tcp_server::run_server", false); } //--------------------------------------------------------------------------------- template bool boosted_tcp_server::is_thread_worker() { TRY_ENTRY(); CRITICAL_REGION_LOCAL(m_threads_lock); BOOST_FOREACH(boost::shared_ptr& thp, m_threads) { if(thp->get_id() == boost::this_thread::get_id()) return true; } if(m_threads_count == 1 && boost::this_thread::get_id() == m_main_thread_id) return true; return false; CATCH_ENTRY_L0("boosted_tcp_server::is_thread_worker", false); } //--------------------------------------------------------------------------------- template bool boosted_tcp_server::timed_wait_server_stop(uint64_t wait_mseconds) { TRY_ENTRY(); boost::chrono::milliseconds ms(wait_mseconds); for (std::size_t i = 0; i < m_threads.size(); ++i) { if(m_threads[i]->joinable() && !m_threads[i]->try_join_for(ms)) { _dbg1("Interrupting thread " << m_threads[i]->native_handle()); m_threads[i]->interrupt(); } } return true; CATCH_ENTRY_L0("boosted_tcp_server::timed_wait_server_stop", false); } //--------------------------------------------------------------------------------- template void boosted_tcp_server::send_stop_signal() { m_stop_signal_sent = true; typename connection::shared_state *state = static_cast::shared_state*>(m_state.get()); state->stop_signal_sent = true; TRY_ENTRY(); connections_mutex.lock(); for (auto &c: connections_) { c->cancel(); } connections_.clear(); connections_mutex.unlock(); io_service_.stop(); CATCH_ENTRY_L0("boosted_tcp_server::send_stop_signal()", void()); } //--------------------------------------------------------------------------------- template void boosted_tcp_server::handle_accept_ipv4(const boost::system::error_code& e) { this->handle_accept(e, false); } //--------------------------------------------------------------------------------- template void boosted_tcp_server::handle_accept_ipv6(const boost::system::error_code& e) { this->handle_accept(e, true); } //--------------------------------------------------------------------------------- template void boosted_tcp_server::handle_accept(const boost::system::error_code& e, bool ipv6) { MDEBUG("handle_accept"); boost::asio::ip::tcp::acceptor* current_acceptor = &acceptor_; connection_ptr* current_new_connection = &new_connection_; auto accept_function_pointer = &boosted_tcp_server::handle_accept_ipv4; if (ipv6) { current_acceptor = &acceptor_ipv6; current_new_connection = &new_connection_ipv6; accept_function_pointer = &boosted_tcp_server::handle_accept_ipv6; } try { if (!e) { if (m_connection_type == e_connection_type_RPC) { const char *ssl_message = "unknown"; switch ((*current_new_connection)->get_ssl_support()) { case epee::net_utils::ssl_support_t::e_ssl_support_disabled: ssl_message = "disabled"; break; case epee::net_utils::ssl_support_t::e_ssl_support_enabled: ssl_message = "enabled"; break; case epee::net_utils::ssl_support_t::e_ssl_support_autodetect: ssl_message = "autodetection"; break; } MDEBUG("New server for RPC connections, SSL " << ssl_message); (*current_new_connection)->setRpcStation(); // hopefully this is not needed actually } connection_ptr conn(std::move((*current_new_connection))); (*current_new_connection).reset(new connection(io_service_, m_state, m_connection_type, conn->get_ssl_support())); current_acceptor->async_accept((*current_new_connection)->socket(), boost::bind(accept_function_pointer, this, boost::asio::placeholders::error)); boost::asio::socket_base::keep_alive opt(true); conn->socket().set_option(opt); bool res; if (default_remote.get_type_id() == net_utils::address_type::invalid) res = conn->start(true, 1 < m_threads_count); else res = conn->start(true, 1 < m_threads_count, default_remote); if (!res) { conn->cancel(); return; } conn->save_dbg_log(); return; } else { MERROR("Error in boosted_tcp_server::handle_accept: " << e); } } catch (const std::exception &e) { MERROR("Exception in boosted_tcp_server::handle_accept: " << e.what()); } // error path, if e or exception assert(m_state != nullptr); // always set in constructor _erro("Some problems at accept: " << e.message() << ", connections_count = " << m_state->sock_count); misc_utils::sleep_no_w(100); (*current_new_connection).reset(new connection(io_service_, m_state, m_connection_type, (*current_new_connection)->get_ssl_support())); current_acceptor->async_accept((*current_new_connection)->socket(), boost::bind(accept_function_pointer, this, boost::asio::placeholders::error)); } //--------------------------------------------------------------------------------- template bool boosted_tcp_server::add_connection(t_connection_context& out, boost::asio::ip::tcp::socket&& sock, network_address real_remote, epee::net_utils::ssl_support_t ssl_support) { if(std::addressof(get_io_service()) == std::addressof(GET_IO_SERVICE(sock))) { connection_ptr conn(new connection(std::move(sock), m_state, m_connection_type, ssl_support)); if(conn->start(false, 1 < m_threads_count, std::move(real_remote))) { conn->get_context(out); conn->save_dbg_log(); return true; } } else { MWARNING(out << " was not added, socket/io_service mismatch"); } return false; } //--------------------------------------------------------------------------------- template typename boosted_tcp_server::try_connect_result_t boosted_tcp_server::try_connect(connection_ptr new_connection_l, const std::string& adr, const std::string& port, boost::asio::ip::tcp::socket &sock_, const boost::asio::ip::tcp::endpoint &remote_endpoint, const std::string &bind_ip, uint32_t conn_timeout, epee::net_utils::ssl_support_t ssl_support) { TRY_ENTRY(); sock_.open(remote_endpoint.protocol()); if(bind_ip != "0.0.0.0" && bind_ip != "0" && bind_ip != "" ) { boost::asio::ip::tcp::endpoint local_endpoint(boost::asio::ip::address::from_string(bind_ip.c_str()), 0); boost::system::error_code ec; sock_.bind(local_endpoint, ec); if (ec) { MERROR("Error binding to " << bind_ip << ": " << ec.message()); if (sock_.is_open()) sock_.close(); return CONNECT_FAILURE; } } /* NOTICE: be careful to make sync connection from event handler: in case if all threads suddenly do sync connect, there will be no thread to dispatch events from io service. */ boost::system::error_code ec = boost::asio::error::would_block; //have another free thread(s), work in wait mode, without event handling struct local_async_context { boost::system::error_code ec; boost::mutex connect_mut; boost::condition_variable cond; }; boost::shared_ptr local_shared_context(new local_async_context()); local_shared_context->ec = boost::asio::error::would_block; boost::unique_lock lock(local_shared_context->connect_mut); auto connect_callback = [](boost::system::error_code ec_, boost::shared_ptr shared_context) { shared_context->connect_mut.lock(); shared_context->ec = ec_; shared_context->cond.notify_one(); shared_context->connect_mut.unlock(); }; sock_.async_connect(remote_endpoint, std::bind(connect_callback, std::placeholders::_1, local_shared_context)); while(local_shared_context->ec == boost::asio::error::would_block) { bool r = local_shared_context->cond.timed_wait(lock, boost::get_system_time() + boost::posix_time::milliseconds(conn_timeout)); if (m_stop_signal_sent) { if (sock_.is_open()) sock_.close(); return CONNECT_FAILURE; } if(local_shared_context->ec == boost::asio::error::would_block && !r) { //timeout sock_.close(); _dbg3("Failed to connect to " << adr << ":" << port << ", because of timeout (" << conn_timeout << ")"); return CONNECT_FAILURE; } } ec = local_shared_context->ec; if (ec || !sock_.is_open()) { _dbg3("Some problems at connect, message: " << ec.message()); if (sock_.is_open()) sock_.close(); return CONNECT_FAILURE; } _dbg3("Connected success to " << adr << ':' << port); const ssl_support_t ssl_support = new_connection_l->get_ssl_support(); if (ssl_support == epee::net_utils::ssl_support_t::e_ssl_support_enabled || ssl_support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect) { // Handshake MDEBUG("Handshaking SSL..."); if (!new_connection_l->handshake(boost::asio::ssl::stream_base::client)) { if (ssl_support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect) { boost::system::error_code ignored_ec; sock_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignored_ec); sock_.close(); return CONNECT_NO_SSL; } MERROR("SSL handshake failed"); if (sock_.is_open()) sock_.close(); return CONNECT_FAILURE; } } return CONNECT_SUCCESS; CATCH_ENTRY_L0("boosted_tcp_server::try_connect", CONNECT_FAILURE); } //--------------------------------------------------------------------------------- template bool boosted_tcp_server::connect(const std::string& adr, const std::string& port, uint32_t conn_timeout, t_connection_context& conn_context, const std::string& bind_ip, epee::net_utils::ssl_support_t ssl_support) { TRY_ENTRY(); connection_ptr new_connection_l(new connection(io_service_, m_state, m_connection_type, ssl_support) ); connections_mutex.lock(); connections_.insert(new_connection_l); MDEBUG("connections_ size now " << connections_.size()); connections_mutex.unlock(); epee::misc_utils::auto_scope_leave_caller scope_exit_handler = epee::misc_utils::create_scope_leave_handler([&](){ CRITICAL_REGION_LOCAL(connections_mutex); connections_.erase(new_connection_l); }); boost::asio::ip::tcp::socket& sock_ = new_connection_l->socket(); bool try_ipv6 = false; boost::asio::ip::tcp::resolver resolver(io_service_); boost::asio::ip::tcp::resolver::query query(boost::asio::ip::tcp::v4(), adr, port, boost::asio::ip::tcp::resolver::query::canonical_name); boost::system::error_code resolve_error; boost::asio::ip::tcp::resolver::iterator iterator; try { //resolving ipv4 address as ipv6 throws, catch here and move on iterator = resolver.resolve(query, resolve_error); } catch (const boost::system::system_error& e) { if (!m_use_ipv6 || (resolve_error != boost::asio::error::host_not_found && resolve_error != boost::asio::error::host_not_found_try_again)) { throw; } try_ipv6 = true; } catch (...) { throw; } std::string bind_ip_to_use; boost::asio::ip::tcp::resolver::iterator end; if(iterator == end) { if (!m_use_ipv6) { _erro("Failed to resolve " << adr); return false; } else { try_ipv6 = true; MINFO("Resolving address as IPv4 failed, trying IPv6"); } } else { bind_ip_to_use = bind_ip; } if (try_ipv6) { boost::asio::ip::tcp::resolver::query query6(boost::asio::ip::tcp::v6(), adr, port, boost::asio::ip::tcp::resolver::query::canonical_name); iterator = resolver.resolve(query6, resolve_error); if(iterator == end) { _erro("Failed to resolve " << adr); return false; } else { if (bind_ip == "0.0.0.0") { bind_ip_to_use = "::"; } else { bind_ip_to_use = ""; } } } MDEBUG("Trying to connect to " << adr << ":" << port << ", bind_ip = " << bind_ip_to_use); //boost::asio::ip::tcp::endpoint remote_endpoint(boost::asio::ip::address::from_string(addr.c_str()), port); boost::asio::ip::tcp::endpoint remote_endpoint(*iterator); auto try_connect_result = try_connect(new_connection_l, adr, port, sock_, remote_endpoint, bind_ip_to_use, conn_timeout, ssl_support); if (try_connect_result == CONNECT_FAILURE) return false; if (ssl_support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect && try_connect_result == CONNECT_NO_SSL) { // we connected, but could not connect with SSL, try without MERROR("SSL handshake failed on an autodetect connection, reconnecting without SSL"); new_connection_l->disable_ssl(); try_connect_result = try_connect(new_connection_l, adr, port, sock_, remote_endpoint, bind_ip_to_use, conn_timeout, epee::net_utils::ssl_support_t::e_ssl_support_disabled); if (try_connect_result != CONNECT_SUCCESS) return false; } // start adds the connection to the config object's list, so we don't need to have it locally anymore connections_mutex.lock(); connections_.erase(new_connection_l); connections_mutex.unlock(); bool r = new_connection_l->start(false, 1 < m_threads_count); if (r) { new_connection_l->get_context(conn_context); //new_connection_l.reset(new connection(io_service_, m_config, m_sock_count, m_pfilter)); } else { assert(m_state != nullptr); // always set in constructor _erro("[sock " << new_connection_l->socket().native_handle() << "] Failed to start connection, connections_count = " << m_state->sock_count); } new_connection_l->save_dbg_log(); return r; CATCH_ENTRY_L0("boosted_tcp_server::connect", false); } //--------------------------------------------------------------------------------- template template bool boosted_tcp_server::connect_async(const std::string& adr, const std::string& port, uint32_t conn_timeout, const t_callback &cb, const std::string& bind_ip, epee::net_utils::ssl_support_t ssl_support) { TRY_ENTRY(); connection_ptr new_connection_l(new connection(io_service_, m_state, m_connection_type, ssl_support) ); connections_mutex.lock(); connections_.insert(new_connection_l); MDEBUG("connections_ size now " << connections_.size()); connections_mutex.unlock(); epee::misc_utils::auto_scope_leave_caller scope_exit_handler = epee::misc_utils::create_scope_leave_handler([&](){ CRITICAL_REGION_LOCAL(connections_mutex); connections_.erase(new_connection_l); }); boost::asio::ip::tcp::socket& sock_ = new_connection_l->socket(); bool try_ipv6 = false; boost::asio::ip::tcp::resolver resolver(io_service_); boost::asio::ip::tcp::resolver::query query(boost::asio::ip::tcp::v4(), adr, port, boost::asio::ip::tcp::resolver::query::canonical_name); boost::system::error_code resolve_error; boost::asio::ip::tcp::resolver::iterator iterator; try { //resolving ipv4 address as ipv6 throws, catch here and move on iterator = resolver.resolve(query, resolve_error); } catch (const boost::system::system_error& e) { if (!m_use_ipv6 || (resolve_error != boost::asio::error::host_not_found && resolve_error != boost::asio::error::host_not_found_try_again)) { throw; } try_ipv6 = true; } catch (...) { throw; } boost::asio::ip::tcp::resolver::iterator end; if(iterator == end) { if (!try_ipv6) { _erro("Failed to resolve " << adr); return false; } else { MINFO("Resolving address as IPv4 failed, trying IPv6"); } } if (try_ipv6) { boost::asio::ip::tcp::resolver::query query6(boost::asio::ip::tcp::v6(), adr, port, boost::asio::ip::tcp::resolver::query::canonical_name); iterator = resolver.resolve(query6, resolve_error); if(iterator == end) { _erro("Failed to resolve " << adr); return false; } } boost::asio::ip::tcp::endpoint remote_endpoint(*iterator); sock_.open(remote_endpoint.protocol()); if(bind_ip != "0.0.0.0" && bind_ip != "0" && bind_ip != "" ) { boost::asio::ip::tcp::endpoint local_endpoint(boost::asio::ip::address::from_string(bind_ip.c_str()), 0); boost::system::error_code ec; sock_.bind(local_endpoint, ec); if (ec) { MERROR("Error binding to " << bind_ip << ": " << ec.message()); if (sock_.is_open()) sock_.close(); return false; } } boost::shared_ptr sh_deadline(new boost::asio::deadline_timer(io_service_)); //start deadline sh_deadline->expires_from_now(boost::posix_time::milliseconds(conn_timeout)); sh_deadline->async_wait([=](const boost::system::error_code& error) { if(error != boost::asio::error::operation_aborted) { _dbg3("Failed to connect to " << adr << ':' << port << ", because of timeout (" << conn_timeout << ")"); new_connection_l->socket().close(); } }); //start async connect sock_.async_connect(remote_endpoint, [=](const boost::system::error_code& ec_) { t_connection_context conn_context = AUTO_VAL_INIT(conn_context); boost::system::error_code ignored_ec; boost::asio::ip::tcp::socket::endpoint_type lep = new_connection_l->socket().local_endpoint(ignored_ec); if(!ec_) {//success if(!sh_deadline->cancel()) { cb(conn_context, boost::asio::error::operation_aborted);//this mean that deadline timer already queued callback with cancel operation, rare situation }else { _dbg3("[sock " << new_connection_l->socket().native_handle() << "] Connected success to " << adr << ':' << port << " from " << lep.address().to_string() << ':' << lep.port()); // start adds the connection to the config object's list, so we don't need to have it locally anymore connections_mutex.lock(); connections_.erase(new_connection_l); connections_mutex.unlock(); bool r = new_connection_l->start(false, 1 < m_threads_count); if (r) { new_connection_l->get_context(conn_context); cb(conn_context, ec_); } else { _dbg3("[sock " << new_connection_l->socket().native_handle() << "] Failed to start connection to " << adr << ':' << port); cb(conn_context, boost::asio::error::fault); } } }else { _dbg3("[sock " << new_connection_l->socket().native_handle() << "] Failed to connect to " << adr << ':' << port << " from " << lep.address().to_string() << ':' << lep.port() << ": " << ec_.message() << ':' << ec_.value()); cb(conn_context, ec_); } }); return true; CATCH_ENTRY_L0("boosted_tcp_server::connect_async", false); } } // namespace } // namespace