Browse Source

shared pointer for SSU

pull/113/head
orignal 10 years ago
parent
commit
95524c8db3
  1. 53
      SSU.cpp
  2. 16
      SSU.h
  3. 5
      SSUData.cpp
  4. 30
      SSUSession.cpp
  5. 4
      SSUSession.h
  6. 6
      Transports.cpp
  7. 4
      Transports.h

53
SSU.cpp

@ -27,17 +27,15 @@ namespace transport
SSUServer::~SSUServer () SSUServer::~SSUServer ()
{ {
for (auto it: m_Sessions)
delete it.second;
} }
void SSUServer::Start () void SSUServer::Start ()
{ {
m_IsRunning = true; m_IsRunning = true;
m_Thread = new std::thread (std::bind (&SSUServer::Run, this)); m_Thread = new std::thread (std::bind (&SSUServer::Run, this));
m_Service.post (boost::bind (&SSUServer::Receive, this)); m_Service.post (std::bind (&SSUServer::Receive, this));
if (context.SupportsV6 ()) if (context.SupportsV6 ())
m_Service.post (boost::bind (&SSUServer::ReceiveV6, this)); m_Service.post (std::bind (&SSUServer::ReceiveV6, this));
if (i2p::context.IsUnreachable ()) if (i2p::context.IsUnreachable ())
ScheduleIntroducersUpdateTimer (); ScheduleIntroducersUpdateTimer ();
} }
@ -76,7 +74,7 @@ namespace transport
m_Relays[tag] = relay; m_Relays[tag] = relay;
} }
SSUSession * SSUServer::FindRelaySession (uint32_t tag) std::shared_ptr<SSUSession> SSUServer::FindRelaySession (uint32_t tag)
{ {
auto it = m_Relays.find (tag); auto it = m_Relays.find (tag);
if (it != m_Relays.end ()) if (it != m_Relays.end ())
@ -128,13 +126,13 @@ namespace transport
void SSUServer::HandleReceivedBuffer (boost::asio::ip::udp::endpoint& from, uint8_t * buf, std::size_t bytes_transferred) void SSUServer::HandleReceivedBuffer (boost::asio::ip::udp::endpoint& from, uint8_t * buf, std::size_t bytes_transferred)
{ {
SSUSession * session = nullptr; std::shared_ptr<SSUSession> session;
auto it = m_Sessions.find (from); auto it = m_Sessions.find (from);
if (it != m_Sessions.end ()) if (it != m_Sessions.end ())
session = it->second; session = it->second;
if (!session) if (!session)
{ {
session = new SSUSession (*this, from); session = std::make_shared<SSUSession> (*this, from);
session->WaitForConnect (); session->WaitForConnect ();
m_Sessions[from] = session; m_Sessions[from] = session;
LogPrint ("New SSU session from ", from.address ().to_string (), ":", from.port (), " created"); LogPrint ("New SSU session from ", from.address ().to_string (), ":", from.port (), " created");
@ -142,7 +140,7 @@ namespace transport
session->ProcessNextMessage (buf, bytes_transferred, from); session->ProcessNextMessage (buf, bytes_transferred, from);
} }
SSUSession * SSUServer::FindSession (const i2p::data::RouterInfo * router) const std::shared_ptr<SSUSession> SSUServer::FindSession (std::shared_ptr<const i2p::data::RouterInfo> router) const
{ {
if (!router) return nullptr; if (!router) return nullptr;
auto address = router->GetSSUAddress (true); // v4 only auto address = router->GetSSUAddress (true); // v4 only
@ -156,7 +154,7 @@ namespace transport
return FindSession (boost::asio::ip::udp::endpoint (address->host, address->port)); return FindSession (boost::asio::ip::udp::endpoint (address->host, address->port));
} }
SSUSession * SSUServer::FindSession (const boost::asio::ip::udp::endpoint& e) const std::shared_ptr<SSUSession> SSUServer::FindSession (const boost::asio::ip::udp::endpoint& e) const
{ {
auto it = m_Sessions.find (e); auto it = m_Sessions.find (e);
if (it != m_Sessions.end ()) if (it != m_Sessions.end ())
@ -165,9 +163,9 @@ namespace transport
return nullptr; return nullptr;
} }
SSUSession * SSUServer::GetSession (std::shared_ptr<const i2p::data::RouterInfo> router, bool peerTest) std::shared_ptr<SSUSession> SSUServer::GetSession (std::shared_ptr<const i2p::data::RouterInfo> router, bool peerTest)
{ {
SSUSession * session = nullptr; std::shared_ptr<SSUSession> session;
if (router) if (router)
{ {
auto address = router->GetSSUAddress (!context.SupportsV6 ()); auto address = router->GetSSUAddress (!context.SupportsV6 ());
@ -180,7 +178,7 @@ namespace transport
else else
{ {
// otherwise create new session // otherwise create new session
session = new SSUSession (*this, remoteEndpoint, router, peerTest); session = std::make_shared<SSUSession> (*this, remoteEndpoint, router, peerTest);
m_Sessions[remoteEndpoint] = session; m_Sessions[remoteEndpoint] = session;
if (!router->UsesIntroducer ()) if (!router->UsesIntroducer ())
@ -196,7 +194,7 @@ namespace transport
int numIntroducers = address->introducers.size (); int numIntroducers = address->introducers.size ();
if (numIntroducers > 0) if (numIntroducers > 0)
{ {
SSUSession * introducerSession = nullptr; std::shared_ptr<SSUSession> introducerSession;
const i2p::data::RouterInfo::Introducer * introducer = nullptr; const i2p::data::RouterInfo::Introducer * introducer = nullptr;
// we might have a session to introducer already // we might have a session to introducer already
for (int i = 0; i < numIntroducers; i++) for (int i = 0; i < numIntroducers; i++)
@ -217,7 +215,7 @@ namespace transport
LogPrint ("Creating new session to introducer"); LogPrint ("Creating new session to introducer");
introducer = &(address->introducers[0]); // TODO: introducer = &(address->introducers[0]); // TODO:
boost::asio::ip::udp::endpoint introducerEndpoint (introducer->iHost, introducer->iPort); boost::asio::ip::udp::endpoint introducerEndpoint (introducer->iHost, introducer->iPort);
introducerSession = new SSUSession (*this, introducerEndpoint, router); introducerSession = std::make_shared<SSUSession> (*this, introducerEndpoint, router);
m_Sessions[introducerEndpoint] = introducerSession; m_Sessions[introducerEndpoint] = introducerSession;
} }
// introduce // introduce
@ -232,8 +230,7 @@ namespace transport
{ {
LogPrint (eLogWarning, "Can't connect to unreachable router. No introducers presented"); LogPrint (eLogWarning, "Can't connect to unreachable router. No introducers presented");
m_Sessions.erase (remoteEndpoint); m_Sessions.erase (remoteEndpoint);
delete session; session.reset ();
session = nullptr;
} }
} }
} }
@ -244,30 +241,26 @@ namespace transport
return session; return session;
} }
void SSUServer::DeleteSession (SSUSession * session) void SSUServer::DeleteSession (std::shared_ptr<SSUSession> session)
{ {
if (session) if (session)
{ {
session->Close (); session->Close ();
m_Sessions.erase (session->GetRemoteEndpoint ()); m_Sessions.erase (session->GetRemoteEndpoint ());
delete session;
} }
} }
void SSUServer::DeleteAllSessions () void SSUServer::DeleteAllSessions ()
{ {
for (auto it: m_Sessions) for (auto it: m_Sessions)
{
it.second->Close (); it.second->Close ();
delete it.second;
}
m_Sessions.clear (); m_Sessions.clear ();
} }
template<typename Filter> template<typename Filter>
SSUSession * SSUServer::GetRandomSession (Filter filter) std::shared_ptr<SSUSession> SSUServer::GetRandomSession (Filter filter)
{ {
std::vector<SSUSession *> filteredSessions; std::vector<std::shared_ptr<SSUSession> > filteredSessions;
for (auto s :m_Sessions) for (auto s :m_Sessions)
if (filter (s.second)) filteredSessions.push_back (s.second); if (filter (s.second)) filteredSessions.push_back (s.second);
if (filteredSessions.size () > 0) if (filteredSessions.size () > 0)
@ -278,10 +271,10 @@ namespace transport
return nullptr; return nullptr;
} }
SSUSession * SSUServer::GetRandomEstablishedSession (const SSUSession * excluded) std::shared_ptr<SSUSession> SSUServer::GetRandomEstablishedSession (std::shared_ptr<const SSUSession> excluded)
{ {
return GetRandomSession ( return GetRandomSession (
[excluded](SSUSession * session)->bool [excluded](std::shared_ptr<SSUSession> session)->bool
{ {
return session->GetState () == eSessionStateEstablished && return session->GetState () == eSessionStateEstablished &&
session != excluded; session != excluded;
@ -296,16 +289,16 @@ namespace transport
for (int i = 0; i < maxNumIntroducers; i++) for (int i = 0; i < maxNumIntroducers; i++)
{ {
auto session = GetRandomSession ( auto session = GetRandomSession (
[&ret, ts](SSUSession * session)->bool [&ret, ts](std::shared_ptr<SSUSession> session)->bool
{ {
return session->GetRelayTag () && !ret.count (session) && return session->GetRelayTag () && !ret.count (session.get ()) &&
session->GetState () == eSessionStateEstablished && session->GetState () == eSessionStateEstablished &&
ts < session->GetCreationTime () + SSU_TO_INTRODUCER_SESSION_DURATION; ts < session->GetCreationTime () + SSU_TO_INTRODUCER_SESSION_DURATION;
} }
); );
if (session) if (session)
{ {
ret.insert (session); ret.insert (session.get ());
break; break;
} }
} }
@ -315,8 +308,8 @@ namespace transport
void SSUServer::ScheduleIntroducersUpdateTimer () void SSUServer::ScheduleIntroducersUpdateTimer ()
{ {
m_IntroducersUpdateTimer.expires_from_now (boost::posix_time::seconds(SSU_KEEP_ALIVE_INTERVAL)); m_IntroducersUpdateTimer.expires_from_now (boost::posix_time::seconds(SSU_KEEP_ALIVE_INTERVAL));
m_IntroducersUpdateTimer.async_wait (boost::bind (&SSUServer::HandleIntroducersUpdateTimer, m_IntroducersUpdateTimer.async_wait (std::bind (&SSUServer::HandleIntroducersUpdateTimer,
this, boost::asio::placeholders::error)); this, std::placeholders::_1));
} }
void SSUServer::HandleIntroducersUpdateTimer (const boost::system::error_code& ecode) void SSUServer::HandleIntroducersUpdateTimer (const boost::system::error_code& ecode)

16
SSU.h

@ -31,18 +31,18 @@ namespace transport
~SSUServer (); ~SSUServer ();
void Start (); void Start ();
void Stop (); void Stop ();
SSUSession * GetSession (std::shared_ptr<const i2p::data::RouterInfo> router, bool peerTest = false); std::shared_ptr<SSUSession> GetSession (std::shared_ptr<const i2p::data::RouterInfo> router, bool peerTest = false);
SSUSession * FindSession (const i2p::data::RouterInfo * router) const; std::shared_ptr<SSUSession> FindSession (std::shared_ptr<const i2p::data::RouterInfo> router) const;
SSUSession * FindSession (const boost::asio::ip::udp::endpoint& e) const; std::shared_ptr<SSUSession> FindSession (const boost::asio::ip::udp::endpoint& e) const;
SSUSession * GetRandomEstablishedSession (const SSUSession * excluded); std::shared_ptr<SSUSession> GetRandomEstablishedSession (std::shared_ptr<const SSUSession> excluded);
void DeleteSession (SSUSession * session); void DeleteSession (std::shared_ptr<SSUSession> session);
void DeleteAllSessions (); void DeleteAllSessions ();
boost::asio::io_service& GetService () { return m_Socket.get_io_service(); }; boost::asio::io_service& GetService () { return m_Socket.get_io_service(); };
const boost::asio::ip::udp::endpoint& GetEndpoint () const { return m_Endpoint; }; const boost::asio::ip::udp::endpoint& GetEndpoint () const { return m_Endpoint; };
void Send (const uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& to); void Send (const uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& to);
void AddRelay (uint32_t tag, const boost::asio::ip::udp::endpoint& relay); void AddRelay (uint32_t tag, const boost::asio::ip::udp::endpoint& relay);
SSUSession * FindRelaySession (uint32_t tag); std::shared_ptr<SSUSession> FindRelaySession (uint32_t tag);
private: private:
@ -54,7 +54,7 @@ namespace transport
void HandleReceivedBuffer (boost::asio::ip::udp::endpoint& from, uint8_t * buf, std::size_t bytes_transferred); void HandleReceivedBuffer (boost::asio::ip::udp::endpoint& from, uint8_t * buf, std::size_t bytes_transferred);
template<typename Filter> template<typename Filter>
SSUSession * GetRandomSession (Filter filter); std::shared_ptr<SSUSession> GetRandomSession (Filter filter);
std::set<SSUSession *> FindIntroducers (int maxNumIntroducers); std::set<SSUSession *> FindIntroducers (int maxNumIntroducers);
void ScheduleIntroducersUpdateTimer (); void ScheduleIntroducersUpdateTimer ();
@ -73,7 +73,7 @@ namespace transport
std::list<boost::asio::ip::udp::endpoint> m_Introducers; // introducers we are connected to std::list<boost::asio::ip::udp::endpoint> m_Introducers; // introducers we are connected to
i2p::crypto::AESAlignedBuffer<2*SSU_MTU_V4> m_ReceiveBuffer; i2p::crypto::AESAlignedBuffer<2*SSU_MTU_V4> m_ReceiveBuffer;
i2p::crypto::AESAlignedBuffer<2*SSU_MTU_V6> m_ReceiveBufferV6; i2p::crypto::AESAlignedBuffer<2*SSU_MTU_V6> m_ReceiveBufferV6;
std::map<boost::asio::ip::udp::endpoint, SSUSession *> m_Sessions; std::map<boost::asio::ip::udp::endpoint, std::shared_ptr<SSUSession> > m_Sessions;
std::map<uint32_t, boost::asio::ip::udp::endpoint> m_Relays; // we are introducer std::map<uint32_t, boost::asio::ip::udp::endpoint> m_Relays; // we are introducer
public: public:

5
SSUData.cpp

@ -401,8 +401,9 @@ namespace transport
{ {
m_ResendTimer.cancel (); m_ResendTimer.cancel ();
m_ResendTimer.expires_from_now (boost::posix_time::seconds(RESEND_INTERVAL)); m_ResendTimer.expires_from_now (boost::posix_time::seconds(RESEND_INTERVAL));
m_ResendTimer.async_wait (boost::bind (&SSUData::HandleResendTimer, auto s = m_Session.shared_from_this();
this, boost::asio::placeholders::error)); m_ResendTimer.async_wait ([s](const boost::system::error_code& ecode)
{ s->m_Data.HandleResendTimer (ecode); });
} }
void SSUData::HandleResendTimer (const boost::system::error_code& ecode) void SSUData::HandleResendTimer (const boost::system::error_code& ecode)

30
SSUSession.cpp

@ -109,7 +109,7 @@ namespace transport
else else
{ {
LogPrint (eLogError, "MAC verification failed ", len, " bytes from ", senderEndpoint); LogPrint (eLogError, "MAC verification failed ", len, " bytes from ", senderEndpoint);
m_Server.DeleteSession (this); m_Server.DeleteSession (shared_from_this ());
return; return;
} }
} }
@ -144,13 +144,13 @@ namespace transport
case PAYLOAD_TYPE_SESSION_DESTROYED: case PAYLOAD_TYPE_SESSION_DESTROYED:
{ {
LogPrint (eLogDebug, "SSU session destroy received"); LogPrint (eLogDebug, "SSU session destroy received");
m_Server.DeleteSession (this); // delete this m_Server.DeleteSession (shared_from_this ());
break; break;
} }
case PAYLOAD_TYPE_RELAY_RESPONSE: case PAYLOAD_TYPE_RELAY_RESPONSE:
ProcessRelayResponse (buf, len); ProcessRelayResponse (buf, len);
if (m_State != eSessionStateEstablished) if (m_State != eSessionStateEstablished)
m_Server.DeleteSession (this); m_Server.DeleteSession (shared_from_this ());
break; break;
case PAYLOAD_TYPE_RELAY_REQUEST: case PAYLOAD_TYPE_RELAY_REQUEST:
LogPrint (eLogDebug, "SSU relay request received"); LogPrint (eLogDebug, "SSU relay request received");
@ -459,7 +459,7 @@ namespace transport
buf += 32; // introkey buf += 32; // introkey
uint32_t nonce = be32toh (*(uint32_t *)buf); uint32_t nonce = be32toh (*(uint32_t *)buf);
SendRelayResponse (nonce, from, introKey, session->m_RemoteEndpoint); SendRelayResponse (nonce, from, introKey, session->m_RemoteEndpoint);
SendRelayIntro (session, from); SendRelayIntro (session.get (), from);
} }
} }
@ -711,8 +711,8 @@ namespace transport
{ {
m_Timer.cancel (); m_Timer.cancel ();
m_Timer.expires_from_now (boost::posix_time::seconds(SSU_CONNECT_TIMEOUT)); m_Timer.expires_from_now (boost::posix_time::seconds(SSU_CONNECT_TIMEOUT));
m_Timer.async_wait (boost::bind (&SSUSession::HandleConnectTimer, m_Timer.async_wait (std::bind (&SSUSession::HandleConnectTimer,
this, boost::asio::placeholders::error)); shared_from_this (), std::placeholders::_1));
} }
void SSUSession::HandleConnectTimer (const boost::system::error_code& ecode) void SSUSession::HandleConnectTimer (const boost::system::error_code& ecode)
@ -731,8 +731,8 @@ namespace transport
{ {
// set connect timer // set connect timer
m_Timer.expires_from_now (boost::posix_time::seconds(SSU_CONNECT_TIMEOUT)); m_Timer.expires_from_now (boost::posix_time::seconds(SSU_CONNECT_TIMEOUT));
m_Timer.async_wait (boost::bind (&SSUSession::HandleConnectTimer, m_Timer.async_wait (std::bind (&SSUSession::HandleConnectTimer,
this, boost::asio::placeholders::error)); shared_from_this (), std::placeholders::_1));
} }
SendRelayRequest (iTag, iKey); SendRelayRequest (iTag, iKey);
} }
@ -742,8 +742,8 @@ namespace transport
m_State = eSessionStateIntroduced; m_State = eSessionStateIntroduced;
// set connect timer // set connect timer
m_Timer.expires_from_now (boost::posix_time::seconds(SSU_CONNECT_TIMEOUT)); m_Timer.expires_from_now (boost::posix_time::seconds(SSU_CONNECT_TIMEOUT));
m_Timer.async_wait (boost::bind (&SSUSession::HandleConnectTimer, m_Timer.async_wait (std::bind (&SSUSession::HandleConnectTimer,
this, boost::asio::placeholders::error)); shared_from_this (), std::placeholders::_1));
} }
void SSUSession::Close () void SSUSession::Close ()
@ -782,7 +782,7 @@ namespace transport
if (m_State != eSessionStateFailed) if (m_State != eSessionStateFailed)
{ {
m_State = eSessionStateFailed; m_State = eSessionStateFailed;
m_Server.DeleteSession (this); // delete this m_Server.DeleteSession (shared_from_this ());
} }
} }
@ -790,8 +790,8 @@ namespace transport
{ {
m_Timer.cancel (); m_Timer.cancel ();
m_Timer.expires_from_now (boost::posix_time::seconds(SSU_TERMINATION_TIMEOUT)); m_Timer.expires_from_now (boost::posix_time::seconds(SSU_TERMINATION_TIMEOUT));
m_Timer.async_wait (boost::bind (&SSUSession::HandleTerminationTimer, m_Timer.async_wait (std::bind (&SSUSession::HandleTerminationTimer,
this, boost::asio::placeholders::error)); shared_from_this (), std::placeholders::_1));
} }
void SSUSession::HandleTerminationTimer (const boost::system::error_code& ecode) void SSUSession::HandleTerminationTimer (const boost::system::error_code& ecode)
@ -821,7 +821,7 @@ namespace transport
void SSUSession::SendI2NPMessage (I2NPMessage * msg) void SSUSession::SendI2NPMessage (I2NPMessage * msg)
{ {
m_Server.GetService ().post (boost::bind (&SSUSession::PostI2NPMessage, this, msg)); m_Server.GetService ().post (std::bind (&SSUSession::PostI2NPMessage, shared_from_this (), msg));
} }
void SSUSession::PostI2NPMessage (I2NPMessage * msg) void SSUSession::PostI2NPMessage (I2NPMessage * msg)
@ -897,7 +897,7 @@ namespace transport
else else
{ {
LogPrint (eLogDebug, "SSU peer test from Alice. We are Bob"); LogPrint (eLogDebug, "SSU peer test from Alice. We are Bob");
auto session = m_Server.GetRandomEstablishedSession (this); // charlie auto session = m_Server.GetRandomEstablishedSession (shared_from_this ()); // charlie
if (session) if (session)
session->SendPeerTest (nonce, senderEndpoint.address ().to_v4 ().to_ulong (), session->SendPeerTest (nonce, senderEndpoint.address ().to_v4 ().to_ulong (),
senderEndpoint.port (), introKey, false); senderEndpoint.port (), introKey, false);

4
SSUSession.h

@ -4,7 +4,7 @@
#include <inttypes.h> #include <inttypes.h>
#include <set> #include <set>
#include <list> #include <list>
#include <boost/asio.hpp> #include <memory>
#include "aes.h" #include "aes.h"
#include "hmac.h" #include "hmac.h"
#include "I2NPProtocol.h" #include "I2NPProtocol.h"
@ -50,7 +50,7 @@ namespace transport
}; };
class SSUServer; class SSUServer;
class SSUSession: public TransportSession class SSUSession: public TransportSession, public std::enable_shared_from_this<SSUSession>
{ {
public: public:

6
Transports.cpp

@ -280,7 +280,7 @@ namespace transport
auto r = netdb.FindRouter (ident); auto r = netdb.FindRouter (ident);
if (r) if (r)
{ {
auto ssuSession = m_SSUServer ? m_SSUServer->FindSession (r.get ()) : nullptr; auto ssuSession = m_SSUServer ? m_SSUServer->FindSession (r) : nullptr;
if (ssuSession) if (ssuSession)
ssuSession->SendI2NPMessage (msg); ssuSession->SendI2NPMessage (msg);
else else
@ -337,13 +337,13 @@ namespace transport
delete timer; delete timer;
} }
void Transports::CloseSession (const i2p::data::RouterInfo * router) void Transports::CloseSession (std::shared_ptr<const i2p::data::RouterInfo> router)
{ {
if (!router) return; if (!router) return;
m_Service.post (boost::bind (&Transports::PostCloseSession, this, router)); m_Service.post (boost::bind (&Transports::PostCloseSession, this, router));
} }
void Transports::PostCloseSession (const i2p::data::RouterInfo * router) void Transports::PostCloseSession (std::shared_ptr<const i2p::data::RouterInfo> router)
{ {
auto ssuSession = m_SSUServer ? m_SSUServer->FindSession (router) : nullptr; auto ssuSession = m_SSUServer ? m_SSUServer->FindSession (router) : nullptr;
if (ssuSession) // try SSU first if (ssuSession) // try SSU first

4
Transports.h

@ -70,7 +70,7 @@ namespace transport
NTCPSession * FindNTCPSession (const i2p::data::IdentHash& ident); NTCPSession * FindNTCPSession (const i2p::data::IdentHash& ident);
void SendMessage (const i2p::data::IdentHash& ident, i2p::I2NPMessage * msg); void SendMessage (const i2p::data::IdentHash& ident, i2p::I2NPMessage * msg);
void CloseSession (const i2p::data::RouterInfo * router); void CloseSession (std::shared_ptr<const i2p::data::RouterInfo> router);
private: private:
@ -80,7 +80,7 @@ namespace transport
void HandleResendTimer (const boost::system::error_code& ecode, boost::asio::deadline_timer * timer, void HandleResendTimer (const boost::system::error_code& ecode, boost::asio::deadline_timer * timer,
const i2p::data::IdentHash& ident, i2p::I2NPMessage * msg); const i2p::data::IdentHash& ident, i2p::I2NPMessage * msg);
void PostMessage (const i2p::data::IdentHash& ident, i2p::I2NPMessage * msg); void PostMessage (const i2p::data::IdentHash& ident, i2p::I2NPMessage * msg);
void PostCloseSession (const i2p::data::RouterInfo * router); void PostCloseSession (std::shared_ptr<const i2p::data::RouterInfo> router);
void DetectExternalIP (); void DetectExternalIP ();

Loading…
Cancel
Save