@ -6,7 +6,7 @@
@@ -6,7 +6,7 @@
*/
// Copyright (c) 2006-2013, Andrey N. Sabelnikov, www.sabelnikov.net
// All rights reserved.
//
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
// * Redistributions of source code must retain the above copyright
@ -17,7 +17,7 @@
@@ -17,7 +17,7 @@
// * Neither the name of the Andrey N. Sabelnikov nor the
// names of its contributors may be used to endorse or promote products
// derived from this software without specific prior written permission.
//
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
@ -28,7 +28,7 @@
@@ -28,7 +28,7 @@
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
//
@ -72,14 +72,14 @@ PRAGMA_WARNING_DISABLE_VS(4355)
@@ -72,14 +72,14 @@ PRAGMA_WARNING_DISABLE_VS(4355)
template<class t_protocol_handler>
connection<t_protocol_handler>::connection( boost::asio::io_service& io_service,
typename t_protocol_handler::config_type& config,
std::atomic<long> &ref_sock_count, // the ++/-- counter
typename t_protocol_handler::config_type& config,
std::atomic<long> &ref_sock_count, // the ++/-- counter
std::atomic<long> &sock_number, // the only increasing ++ number generator
i_connection_filter* &pfilter
,t_connection_type connection_type
)
:
connection_basic(io_service, ref_sock_count, sock_number),
:
connection_basic(io_service, ref_sock_count, sock_number),
m_protocol_handler(this, config, context),
m_pfilter( pfilter ),
m_connection_type( connection_type ),
@ -188,10 +188,10 @@ PRAGMA_WARNING_DISABLE_VS(4355)
@@ -188,10 +188,10 @@ PRAGMA_WARNING_DISABLE_VS(4355)
socket_.set_option( optionTos );
//_dbg1("Set ToS flag to " << tos);
#endif
boost::asio::ip::tcp::no_delay noDelayOption(false);
socket_.set_option(noDelayOption);
return true;
CATCH_ENTRY_L0("connection<t_protocol_handler>::start()", false);
@ -290,7 +290,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
@@ -290,7 +290,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
{
TRY_ENTRY();
//_info("[sock " << socket_.native_handle() << "] Async read calledback.");
if (!e)
{
{
@ -298,7 +298,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
@@ -298,7 +298,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
m_throttle_speed_in.handle_trafic_exact(bytes_transferred);
context.m_current_speed_down = m_throttle_speed_in.get_current_speed();
}
{
CRITICAL_REGION_LOCAL( epee::net_utils::network_throttle_manager::network_throttle_manager::m_lock_get_global_throttle_in );
epee::net_utils::network_throttle_manager::network_throttle_manager::get_global_throttle_in().handle_trafic_exact(bytes_transferred);
@ -314,7 +314,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
@@ -314,7 +314,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
CRITICAL_REGION_LOCAL( epee::net_utils::network_throttle_manager::m_lock_get_global_throttle_in );
delay = epee::net_utils::network_throttle_manager::get_global_throttle_in().get_sleep_time_after_tick( bytes_transferred );
}
delay *= 0.5;
if (delay > 0) {
long int ms = (long int)(delay * 100);
@ -323,7 +323,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
@@ -323,7 +323,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
}
} while(delay > 0);
} // any form of sleeping
//_info("[sock " << socket_.native_handle() << "] RECV " << bytes_transferred);
logger_handle_net_read(bytes_transferred);
context.m_last_recv = time(NULL);
@ -331,7 +331,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
@@ -331,7 +331,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
m_ready_to_close = false;
bool recv_res = m_protocol_handler.handle_recv(buffer_.data(), bytes_transferred);
if(!recv_res)
{
{
//_info("[sock " << socket_.native_handle() << "] protocol_want_close");
//some error in protocol, protocol handler ask to close connection
@ -389,15 +389,15 @@ PRAGMA_WARNING_DISABLE_VS(4355)
@@ -389,15 +389,15 @@ PRAGMA_WARNING_DISABLE_VS(4355)
}else
{
//multi thread model, we can't(!) wait in blocked call
//so we make non blocking call and releasing CPU by calling sleep(0);
//so we make non blocking call and releasing CPU by calling sleep(0);
//if no handlers were called
//TODO: Maybe we need to have have critical section + event + callback to upper protocol to
//ask it inside(!) critical region if we still able to go in event wait...
size_t cnt = socket_.get_io_service().poll_one();
size_t cnt = socket_.get_io_service().poll_one();
if(!cnt)
misc_utils::sleep_no_w(0);
}
return true;
CATCH_ENTRY_L0("connection<t_protocol_handler>::call_run_once_service_io", false);
}
@ -423,15 +423,15 @@ PRAGMA_WARNING_DISABLE_VS(4355)
@@ -423,15 +423,15 @@ PRAGMA_WARNING_DISABLE_VS(4355)
if (allow_split && (cb > chunksize_max_unsigned)) {
{ // LOCK: chunking
epee::critical_region_t<decltype(m_chunking_lock)> send_guard(m_chunking_lock); // *** critical ***
epee::critical_region_t<decltype(m_chunking_lock)> send_guard(m_chunking_lock); // *** critical ***
MDEBUG("do_send() will SPLIT into small chunks, from packet="<<cb<<" B for ptr="<<ptr);
t_safe all = cb; // all bytes to send
t_safe all = cb; // all bytes to send
t_safe pos = 0; // current sending position
// 01234567890
// 01234567890
// ^^^^ (pos=0, len=4) ; pos:=pos+len, pos=4
// ^^^^ (pos=4, len=4) ; pos:=pos+len, pos=8
// ^^^ (pos=8, len=4) ;
// ^^^ (pos=8, len=4) ;
// const size_t bufsize = chunksize_good; // TODO safecast
// char* buf = new char[ bufsize ];
@ -447,7 +447,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
@@ -447,7 +447,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
unsigned long long int len_unsigned = static_cast<long long int>( len );
CHECK_AND_ASSERT_MES(len>0, false, "len not strictly positive"); // (redundant)
CHECK_AND_ASSERT_MES(len_unsigned < std::numeric_limits<size_t>::max(), false, "Invalid len_unsigned"); // yeap we want strong < then max size, to be sure
void *chunk_start = ((char*)ptr) + pos;
MDEBUG("chunk_start="<<chunk_start<<" ptr="<<ptr<<" pos="<<pos);
CHECK_AND_ASSERT_MES(chunk_start >= ptr, false, "Pointer wraparound"); // not wrapped around address?
@ -506,7 +506,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
@@ -506,7 +506,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
context.m_send_cnt += cb;
//some data should be wrote to stream
//request complete
// No sleeping here; sleeping is done once and for all in "handle_write"
m_send_que_lock.lock(); // *** critical ***
@ -539,13 +539,13 @@ PRAGMA_WARNING_DISABLE_VS(4355)
@@ -539,13 +539,13 @@ PRAGMA_WARNING_DISABLE_VS(4355)
m_send_que.resize(m_send_que.size()+1);
m_send_que.back().assign((const char*)ptr, cb);
if(m_send_que.size() > 1)
{ // active operation should be in progress, nothing to do, just wait last operation callback
auto size_now = cb;
MDEBUG("do_send_chunk() NOW just queues: packet="<<size_now<<" B, is added to queue-size="<<m_send_que.size());
//do_send_handler_delayed( ptr , size_now ); // (((H))) // empty function
LOG_TRACE_CC(context, "[sock " << socket_.native_handle() << "] Async send requested " << m_send_que.front().size());
}
else
@ -566,14 +566,14 @@ PRAGMA_WARNING_DISABLE_VS(4355)
@@ -566,14 +566,14 @@ PRAGMA_WARNING_DISABLE_VS(4355)
reset_timer(get_default_timeout(), false);
boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), size_now ) ,
//strand_.wrap(
boost::bind(&connection<t_protocol_handler>::handle_write, self, _1, _2)
boost::bind(&connection<t_protocol_handler>::handle_write, self, boost::placeholders:: _1, boost::placeholders:: _2)
//)
);
//_dbg3("(chunk): " << size_now);
//logger_handle_net_write(size_now);
//_info("[sock " << socket_.native_handle() << "] Async send requested " << m_send_que.front().size());
}
//do_send_handler_stop( ptr , cb ); // empty function
return true;
@ -685,7 +685,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
@@ -685,7 +685,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
{
shutdown();
}
return true;
CATCH_ENTRY_L0("connection<t_protocol_handler>::close", false);
}
@ -748,9 +748,9 @@ PRAGMA_WARNING_DISABLE_VS(4355)
@@ -748,9 +748,9 @@ PRAGMA_WARNING_DISABLE_VS(4355)
if (speed_limit_is_enabled())
do_send_handler_write_from_queue(e, m_send_que.front().size() , m_send_que.size()); // (((H)))
CHECK_AND_ASSERT_MES( size_now == m_send_que.front().size(), void(), "Unexpected queue size");
boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), size_now) ,
boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), size_now) ,
// strand_.wrap(
boost::bind(&connection<t_protocol_handler>::handle_write, connection<t_protocol_handler>::shared_from_this(), _1, _2)
boost::bind(&connection<t_protocol_handler>::handle_write, connection<t_protocol_handler>::shared_from_this(), boost::placeholders:: _1, boost::placeholders:: _2)
// )
);
//_dbg3("(normal)" << size_now);
@ -768,7 +768,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
@@ -768,7 +768,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
template<class t_protocol_handler>
void connection<t_protocol_handler>::setRpcStation()
{
m_connection_type = e_connection_type_RPC;
m_connection_type = e_connection_type_RPC;
MDEBUG("set m_connection_type = RPC ");
}
@ -787,8 +787,8 @@ PRAGMA_WARNING_DISABLE_VS(4355)
@@ -787,8 +787,8 @@ PRAGMA_WARNING_DISABLE_VS(4355)
m_io_service_local_instance(new boost::asio::io_service()),
io_service_(*m_io_service_local_instance.get()),
acceptor_(io_service_),
m_stop_signal_sent(false), m_port(0),
m_sock_count(0), m_sock_number(0), m_threads_count(0),
m_stop_signal_sent(false), m_port(0),
m_sock_count(0), m_sock_number(0), m_threads_count(0),
m_pfilter(NULL), m_thread_index(0),
m_connection_type( connection_type ),
new_connection_()
@ -801,8 +801,8 @@ PRAGMA_WARNING_DISABLE_VS(4355)
@@ -801,8 +801,8 @@ PRAGMA_WARNING_DISABLE_VS(4355)
boosted_tcp_server<t_protocol_handler>::boosted_tcp_server(boost::asio::io_service& extarnal_io_service, t_connection_type connection_type) :
io_service_(extarnal_io_service),
acceptor_(io_service_),
m_stop_signal_sent(false), m_port(0),
m_sock_count(0), m_sock_number(0), m_threads_count(0),
m_stop_signal_sent(false), m_port(0),
m_sock_count(0), m_sock_number(0), m_threads_count(0),
m_pfilter(NULL), m_thread_index(0),
m_connection_type(connection_type),
new_connection_()
@ -819,7 +819,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
@@ -819,7 +819,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
}
//---------------------------------------------------------------------------------
template<class t_protocol_handler>
void boosted_tcp_server<t_protocol_handler>::create_server_type_map()
void boosted_tcp_server<t_protocol_handler>::create_server_type_map()
{
server_type_map["NET"] = e_connection_type_NET;
server_type_map["RPC"] = e_connection_type_RPC;
@ -866,7 +866,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
@@ -866,7 +866,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
PUSH_WARNINGS
DISABLE_GCC_WARNING(maybe-uninitialized)
template<class t_protocol_handler>
bool boosted_tcp_server<t_protocol_handler>::init_server(const std::string port, const std::string& address)
bool boosted_tcp_server<t_protocol_handler>::init_server(const std::string port, const std::string& address)
{
uint32_t p = 0;
@ -882,7 +882,7 @@ POP_WARNINGS
@@ -882,7 +882,7 @@ POP_WARNINGS
bool boosted_tcp_server<t_protocol_handler>::worker_thread()
{
TRY_ENTRY();
uint32_t local_thr_index = boost::interprocess::ipcdetail::atomic_inc32(&m_thread_index);
uint32_t local_thr_index = boost::interprocess::ipcdetail::atomic_inc32(&m_thread_index);
std::string thread_name = std::string("[") + m_thread_name_prefix;
thread_name += boost::to_string(local_thr_index) + "]";
MLOG_SET_THREAD_NAME(thread_name);
@ -954,7 +954,7 @@ POP_WARNINGS
@@ -954,7 +954,7 @@ POP_WARNINGS
m_threads.clear();
_fact("JOINING all threads - DONE");
}
}
else {
_dbg1("Reiniting OK.");
return true;
@ -1089,7 +1089,7 @@ POP_WARNINGS
@@ -1089,7 +1089,7 @@ POP_WARNINGS
connections_mutex.unlock();
epee::misc_utils::auto_scope_leave_caller scope_exit_handler = epee::misc_utils::create_scope_leave_handler([&](){ CRITICAL_REGION_LOCAL(connections_mutex); connections_.erase(new_connection_l); });
boost::asio::ip::tcp::socket& sock_ = new_connection_l->socket();
//////////////////////////////////////////////////////////////////////////
boost::asio::ip::tcp::resolver resolver(io_service_);
boost::asio::ip::tcp::resolver::query query(boost::asio::ip::tcp::v4(), adr, port, boost::asio::ip::tcp::resolver::query::canonical_name);
@ -1105,7 +1105,7 @@ POP_WARNINGS
@@ -1105,7 +1105,7 @@ POP_WARNINGS
//boost::asio::ip::tcp::endpoint remote_endpoint(boost::asio::ip::address::from_string(addr.c_str()), port);
boost::asio::ip::tcp::endpoint remote_endpoint(*iterator);
sock_.open(remote_endpoint.protocol());
if(bind_ip != "0.0.0.0" && bind_ip != "0" && bind_ip != "" )
{
@ -1143,7 +1143,7 @@ POP_WARNINGS
@@ -1143,7 +1143,7 @@ POP_WARNINGS
shared_context->connect_mut.lock(); shared_context->ec = ec_; shared_context->cond.notify_one(); shared_context->connect_mut.unlock();
};
sock_.async_connect(remote_endpoint, boost::bind<void>(connect_callback, _1, local_shared_context));
sock_.async_connect(remote_endpoint, boost::bind<void>(connect_callback, boost::placeholders:: _1, local_shared_context));
while(local_shared_context->ec == boost::asio::error::would_block)
{
bool r = local_shared_context->cond.timed_wait(lock, boost::get_system_time() + boost::posix_time::milliseconds(conn_timeout));
@ -1187,7 +1187,7 @@ POP_WARNINGS
@@ -1187,7 +1187,7 @@ POP_WARNINGS
{
_erro("[sock " << new_connection_l->socket().native_handle() << "] Failed to start connection, connections_count = " << m_sock_count);
}
new_connection_l->save_dbg_log();
return r;
@ -1198,7 +1198,7 @@ POP_WARNINGS
@@ -1198,7 +1198,7 @@ POP_WARNINGS
template<class t_protocol_handler> template<class t_callback>
bool boosted_tcp_server<t_protocol_handler>::connect_async(const std::string& adr, const std::string& port, uint32_t conn_timeout, const t_callback &cb, const std::string& bind_ip)
{
TRY_ENTRY();
TRY_ENTRY();
connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter, m_connection_type) );
connections_mutex.lock();
connections_.insert(new_connection_l);
@ -1206,7 +1206,7 @@ POP_WARNINGS
@@ -1206,7 +1206,7 @@ POP_WARNINGS
connections_mutex.unlock();
epee::misc_utils::auto_scope_leave_caller scope_exit_handler = epee::misc_utils::create_scope_leave_handler([&](){ CRITICAL_REGION_LOCAL(connections_mutex); connections_.erase(new_connection_l); });
boost::asio::ip::tcp::socket& sock_ = new_connection_l->socket();
//////////////////////////////////////////////////////////////////////////
boost::asio::ip::tcp::resolver resolver(io_service_);
boost::asio::ip::tcp::resolver::query query(boost::asio::ip::tcp::v4(), adr, port, boost::asio::ip::tcp::resolver::query::canonical_name);
@ -1219,7 +1219,7 @@ POP_WARNINGS
@@ -1219,7 +1219,7 @@ POP_WARNINGS
}
//////////////////////////////////////////////////////////////////////////
boost::asio::ip::tcp::endpoint remote_endpoint(*iterator);
sock_.open(remote_endpoint.protocol());
if(bind_ip != "0.0.0.0" && bind_ip != "0" && bind_ip != "" )
{
@ -1234,13 +1234,13 @@ POP_WARNINGS
@@ -1234,13 +1234,13 @@ POP_WARNINGS
return false;
}
}
boost::shared_ptr<boost::asio::deadline_timer> sh_deadline(new boost::asio::deadline_timer(io_service_));
//start deadline
sh_deadline->expires_from_now(boost::posix_time::milliseconds(conn_timeout));
sh_deadline->async_wait([=](const boost::system::error_code& error)
{
if(error != boost::asio::error::operation_aborted)
if(error != boost::asio::error::operation_aborted)
{
_dbg3("Failed to connect to " << adr << ':' << port << ", because of timeout (" << conn_timeout << ")");
new_connection_l->socket().close();
@ -1288,7 +1288,7 @@ POP_WARNINGS
@@ -1288,7 +1288,7 @@ POP_WARNINGS
return true;
CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::connect_async", false);
}
} // namespace
} // namespace
PRAGMA_WARNING_POP