Browse Source

fixed race condition

pull/158/head
orignal 10 years ago
parent
commit
ab0bd908ec
  1. 15
      SSU.cpp
  2. 2
      SSU.h
  3. 18
      SSUSession.cpp
  4. 2
      SSUSession.h

15
SSU.cpp

@ -161,7 +161,10 @@ namespace transport
{ {
session = std::make_shared<SSUSession> (*this, from); session = std::make_shared<SSUSession> (*this, from);
session->WaitForConnect (); session->WaitForConnect ();
m_Sessions[from] = session; {
std::unique_lock<std::mutex> l(m_SessionsMutex);
m_Sessions[from] = session;
}
LogPrint ("New SSU session from ", from.address ().to_string (), ":", from.port (), " created"); LogPrint ("New SSU session from ", from.address ().to_string (), ":", from.port (), " created");
} }
session->ProcessNextMessage (buf, bytes_transferred, from); session->ProcessNextMessage (buf, bytes_transferred, from);
@ -206,8 +209,10 @@ namespace transport
{ {
// otherwise create new session // otherwise create new session
session = std::make_shared<SSUSession> (*this, remoteEndpoint, router, peerTest); session = std::make_shared<SSUSession> (*this, remoteEndpoint, router, peerTest);
m_Sessions[remoteEndpoint] = session; {
std::unique_lock<std::mutex> l(m_SessionsMutex);
m_Sessions[remoteEndpoint] = session;
}
if (!router->UsesIntroducer ()) if (!router->UsesIntroducer ())
{ {
// connect directly // connect directly
@ -243,6 +248,7 @@ namespace transport
introducer = &(address->introducers[0]); // TODO: introducer = &(address->introducers[0]); // TODO:
boost::asio::ip::udp::endpoint introducerEndpoint (introducer->iHost, introducer->iPort); boost::asio::ip::udp::endpoint introducerEndpoint (introducer->iHost, introducer->iPort);
introducerSession = std::make_shared<SSUSession> (*this, introducerEndpoint, router); introducerSession = std::make_shared<SSUSession> (*this, introducerEndpoint, router);
std::unique_lock<std::mutex> l(m_SessionsMutex);
m_Sessions[introducerEndpoint] = introducerSession; m_Sessions[introducerEndpoint] = introducerSession;
} }
// introduce // introduce
@ -256,6 +262,7 @@ namespace transport
else else
{ {
LogPrint (eLogWarning, "Can't connect to unreachable router. No introducers presented"); LogPrint (eLogWarning, "Can't connect to unreachable router. No introducers presented");
std::unique_lock<std::mutex> l(m_SessionsMutex);
m_Sessions.erase (remoteEndpoint); m_Sessions.erase (remoteEndpoint);
session.reset (); session.reset ();
} }
@ -273,12 +280,14 @@ namespace transport
if (session) if (session)
{ {
session->Close (); session->Close ();
std::unique_lock<std::mutex> l(m_SessionsMutex);
m_Sessions.erase (session->GetRemoteEndpoint ()); m_Sessions.erase (session->GetRemoteEndpoint ());
} }
} }
void SSUServer::DeleteAllSessions () void SSUServer::DeleteAllSessions ()
{ {
std::unique_lock<std::mutex> l(m_SessionsMutex);
for (auto it: m_Sessions) for (auto it: m_Sessions)
it.second->Close (); it.second->Close ();
m_Sessions.clear (); m_Sessions.clear ();

2
SSU.h

@ -7,6 +7,7 @@
#include <list> #include <list>
#include <set> #include <set>
#include <thread> #include <thread>
#include <mutex>
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include "aes.h" #include "aes.h"
#include "I2PEndian.h" #include "I2PEndian.h"
@ -75,6 +76,7 @@ namespace transport
std::list<boost::asio::ip::udp::endpoint> m_Introducers; // introducers we are connected to std::list<boost::asio::ip::udp::endpoint> m_Introducers; // introducers we are connected to
i2p::crypto::AESAlignedBuffer<2*SSU_MTU_V4> m_ReceiveBuffer; i2p::crypto::AESAlignedBuffer<2*SSU_MTU_V4> m_ReceiveBuffer;
i2p::crypto::AESAlignedBuffer<2*SSU_MTU_V6> m_ReceiveBufferV6; i2p::crypto::AESAlignedBuffer<2*SSU_MTU_V6> m_ReceiveBufferV6;
std::mutex m_SessionsMutex;
std::map<boost::asio::ip::udp::endpoint, std::shared_ptr<SSUSession> > m_Sessions; std::map<boost::asio::ip::udp::endpoint, std::shared_ptr<SSUSession> > m_Sessions;
std::map<uint32_t, boost::asio::ip::udp::endpoint> m_Relays; // we are introducer std::map<uint32_t, boost::asio::ip::udp::endpoint> m_Relays; // we are introducer

18
SSUSession.cpp

@ -16,7 +16,7 @@ namespace transport
SSUSession::SSUSession (SSUServer& server, boost::asio::ip::udp::endpoint& remoteEndpoint, SSUSession::SSUSession (SSUServer& server, boost::asio::ip::udp::endpoint& remoteEndpoint,
std::shared_ptr<const i2p::data::RouterInfo> router, bool peerTest ): TransportSession (router), std::shared_ptr<const i2p::data::RouterInfo> router, bool peerTest ): TransportSession (router),
m_Server (server), m_RemoteEndpoint (remoteEndpoint), m_Server (server), m_RemoteEndpoint (remoteEndpoint),
m_Timer (m_Server.GetService ()), m_PeerTest (peerTest), m_Timer (GetService ()), m_PeerTest (peerTest),
m_State (eSessionStateUnknown), m_IsSessionKey (false), m_RelayTag (0), m_State (eSessionStateUnknown), m_IsSessionKey (false), m_RelayTag (0),
m_Data (*this), m_NumSentBytes (0), m_NumReceivedBytes (0) m_Data (*this), m_NumSentBytes (0), m_NumReceivedBytes (0)
{ {
@ -27,6 +27,11 @@ namespace transport
{ {
} }
boost::asio::io_service& SSUSession::GetService ()
{
return IsV6 () ? m_Server.GetServiceV6 () : m_Server.GetService ();
}
void SSUSession::CreateAESandMacKey (const uint8_t * pubKey) void SSUSession::CreateAESandMacKey (const uint8_t * pubKey)
{ {
CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg); CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg);
@ -755,14 +760,15 @@ namespace transport
void SSUSession::Close () void SSUSession::Close ()
{ {
m_State = eSessionStateClosed;
SendSesionDestroyed (); SendSesionDestroyed ();
transports.PeerDisconnected (shared_from_this ()); transports.PeerDisconnected (shared_from_this ());
m_Timer.cancel ();
} }
void SSUSession::Done () void SSUSession::Done ()
{ {
boost::asio::io_service& service = IsV6 () ? m_Server.GetServiceV6 () : m_Server.GetService (); GetService ().post (std::bind (&SSUSession::Failed, shared_from_this ()));
service.post (std::bind (&SSUSession::Failed, shared_from_this ()));
} }
void SSUSession::Established () void SSUSession::Established ()
@ -824,8 +830,7 @@ namespace transport
void SSUSession::SendI2NPMessage (I2NPMessage * msg) void SSUSession::SendI2NPMessage (I2NPMessage * msg)
{ {
boost::asio::io_service& service = IsV6 () ? m_Server.GetServiceV6 () : m_Server.GetService (); GetService ().post (std::bind (&SSUSession::PostI2NPMessage, shared_from_this (), msg));
service.post (std::bind (&SSUSession::PostI2NPMessage, shared_from_this (), msg));
} }
void SSUSession::PostI2NPMessage (I2NPMessage * msg) void SSUSession::PostI2NPMessage (I2NPMessage * msg)
@ -836,8 +841,7 @@ namespace transport
void SSUSession::SendI2NPMessages (const std::vector<I2NPMessage *>& msgs) void SSUSession::SendI2NPMessages (const std::vector<I2NPMessage *>& msgs)
{ {
boost::asio::io_service& service = IsV6 () ? m_Server.GetServiceV6 () : m_Server.GetService (); GetService ().post (std::bind (&SSUSession::PostI2NPMessages, shared_from_this (), msgs));
service.post (std::bind (&SSUSession::PostI2NPMessages, shared_from_this (), msgs));
} }
void SSUSession::PostI2NPMessages (std::vector<I2NPMessage *> msgs) void SSUSession::PostI2NPMessages (std::vector<I2NPMessage *> msgs)

2
SSUSession.h

@ -45,6 +45,7 @@ namespace transport
eSessionStateUnknown, eSessionStateUnknown,
eSessionStateIntroduced, eSessionStateIntroduced,
eSessionStateEstablished, eSessionStateEstablished,
eSessionStateClosed,
eSessionStateFailed eSessionStateFailed
}; };
@ -80,6 +81,7 @@ namespace transport
private: private:
boost::asio::io_service& GetService ();
void CreateAESandMacKey (const uint8_t * pubKey); void CreateAESandMacKey (const uint8_t * pubKey);
void PostI2NPMessage (I2NPMessage * msg); void PostI2NPMessage (I2NPMessage * msg);

Loading…
Cancel
Save