Browse Source

support multiple transport sessions to the same peer

pull/200/head
orignal 10 years ago
parent
commit
e461982a31
  1. 40
      Transports.cpp
  2. 8
      Transports.h

40
Transports.cpp

@ -239,7 +239,7 @@ namespace transport @@ -239,7 +239,7 @@ namespace transport
try
{
auto r = netdb.FindRouter (ident);
it = m_Peers.insert (std::pair<i2p::data::IdentHash, Peer>(ident, { 0, r, nullptr,
it = m_Peers.insert (std::pair<i2p::data::IdentHash, Peer>(ident, { 0, r, {},
i2p::util::GetSecondsSinceEpoch () })).first;
connected = ConnectToPeer (ident, it->second);
}
@ -254,8 +254,8 @@ namespace transport @@ -254,8 +254,8 @@ namespace transport
return;
}
}
if (it->second.session)
it->second.session->SendI2NPMessages (msgs);
if (!it->second.sessions.empty ())
it->second.sessions.front ()->SendI2NPMessages (msgs);
else
{
for (auto it1: msgs)
@ -309,7 +309,7 @@ namespace transport @@ -309,7 +309,7 @@ namespace transport
}
}
LogPrint (eLogError, "No NTCP and SSU addresses available");
if (peer.session) peer.session->Done ();
peer.Done ();
m_Peers.erase (ident);
return false;
}
@ -436,20 +436,12 @@ namespace transport @@ -436,20 +436,12 @@ namespace transport
auto it = m_Peers.find (ident);
if (it != m_Peers.end ())
{
if (!it->second.session)
{
it->second.session = session;
session->SendI2NPMessages (it->second.delayedMessages);
it->second.delayedMessages.clear ();
}
else
{
LogPrint (eLogError, "Session for ", ident.ToBase64 ().substr (0, 4), " already exists");
session->Done ();
}
it->second.sessions.push_back (session);
session->SendI2NPMessages (it->second.delayedMessages);
it->second.delayedMessages.clear ();
}
else // incoming connection
m_Peers.insert (std::make_pair (ident, Peer{ 0, nullptr, session, i2p::util::GetSecondsSinceEpoch () }));
m_Peers.insert (std::make_pair (ident, Peer{ 0, nullptr, { session }, i2p::util::GetSecondsSinceEpoch () }));
});
}
@ -459,12 +451,16 @@ namespace transport @@ -459,12 +451,16 @@ namespace transport
{
auto ident = session->GetRemoteIdentity ().GetIdentHash ();
auto it = m_Peers.find (ident);
if (it != m_Peers.end () && (!it->second.session || it->second.session == session))
if (it != m_Peers.end ())
{
if (it->second.delayedMessages.size () > 0)
ConnectToPeer (ident, it->second);
else
m_Peers.erase (it);
it->second.sessions.remove (session);
if (it->second.sessions.empty ()) // TODO: why?
{
if (it->second.delayedMessages.size () > 0)
ConnectToPeer (ident, it->second);
else
m_Peers.erase (it);
}
}
});
}
@ -482,7 +478,7 @@ namespace transport @@ -482,7 +478,7 @@ namespace transport
auto ts = i2p::util::GetSecondsSinceEpoch ();
for (auto it = m_Peers.begin (); it != m_Peers.end (); )
{
if (!it->second.session && ts > it->second.creationTime + SESSION_CREATION_TIMEOUT)
if (it->second.sessions.empty () && ts > it->second.creationTime + SESSION_CREATION_TIMEOUT)
{
LogPrint (eLogError, "Session to peer ", it->first.ToBase64 (), " has not been created in ", SESSION_CREATION_TIMEOUT, " seconds");
it = m_Peers.erase (it);

8
Transports.h

@ -60,10 +60,16 @@ namespace transport @@ -60,10 +60,16 @@ namespace transport
{
int numAttempts;
std::shared_ptr<const i2p::data::RouterInfo> router;
std::shared_ptr<TransportSession> session;
std::list<std::shared_ptr<TransportSession> > sessions;
uint64_t creationTime;
std::vector<i2p::I2NPMessage *> delayedMessages;
void Done ()
{
for (auto it: sessions)
it->Done ();
}
~Peer ()
{
for (auto it :delayedMessages)

Loading…
Cancel
Save