Browse Source

intermediate queue for transport sessions. use std::list instead std::vector for multiple I2NP messages

pull/2121/head
orignal 3 weeks ago
parent
commit
23e66671c2
  1. 40
      libi2pd/NTCP2.cpp
  2. 9
      libi2pd/NTCP2.h
  3. 2
      libi2pd/NetDb.cpp
  4. 25
      libi2pd/SSU2Session.cpp
  5. 6
      libi2pd/SSU2Session.h
  6. 8
      libi2pd/TransportSession.h
  7. 36
      libi2pd/Transports.cpp
  8. 7
      libi2pd/Transports.h
  9. 2
      libi2pd/TunnelGateway.cpp

40
libi2pd/NTCP2.cpp

@ -375,6 +375,8 @@ namespace transport
m_Socket.close (); m_Socket.close ();
transports.PeerDisconnected (shared_from_this ()); transports.PeerDisconnected (shared_from_this ());
m_Server.RemoveNTCP2Session (shared_from_this ()); m_Server.RemoveNTCP2Session (shared_from_this ());
if (!m_IntermediateQueue.empty ())
m_SendQueue.splice (m_SendQueue.end (), m_IntermediateQueue);
for (auto& it: m_SendQueue) for (auto& it: m_SendQueue)
it->Drop (); it->Drop ();
m_SendQueue.clear (); m_SendQueue.clear ();
@ -1207,7 +1209,7 @@ namespace transport
void NTCP2Session::MoveSendQueue (std::shared_ptr<NTCP2Session> other) void NTCP2Session::MoveSendQueue (std::shared_ptr<NTCP2Session> other)
{ {
if (!other || m_SendQueue.empty ()) return; if (!other || m_SendQueue.empty ()) return;
std::vector<std::shared_ptr<I2NPMessage> > msgs; std::list<std::shared_ptr<I2NPMessage> > msgs;
auto ts = i2p::util::GetMillisecondsSinceEpoch (); auto ts = i2p::util::GetMillisecondsSinceEpoch ();
for (auto it: m_SendQueue) for (auto it: m_SendQueue)
if (!it->IsExpired (ts)) if (!it->IsExpired (ts))
@ -1216,7 +1218,7 @@ namespace transport
it->Drop (); it->Drop ();
m_SendQueue.clear (); m_SendQueue.clear ();
if (!msgs.empty ()) if (!msgs.empty ())
other->PostI2NPMessages (msgs); other->SendI2NPMessages (msgs);
} }
size_t NTCP2Session::CreatePaddingBlock (size_t msgLen, uint8_t * buf, size_t len) size_t NTCP2Session::CreatePaddingBlock (size_t msgLen, uint8_t * buf, size_t len)
@ -1297,20 +1299,38 @@ namespace transport
m_Server.GetService ().post (std::bind (&NTCP2Session::Terminate, shared_from_this ())); // let termination message go m_Server.GetService ().post (std::bind (&NTCP2Session::Terminate, shared_from_this ())); // let termination message go
} }
void NTCP2Session::SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs) void NTCP2Session::SendI2NPMessages (std::list<std::shared_ptr<I2NPMessage> >& msgs)
{ {
m_Server.GetService ().post (std::bind (&NTCP2Session::PostI2NPMessages, shared_from_this (), msgs)); if (m_IsTerminated || msgs.empty ()) return;
bool empty = false;
{
std::lock_guard<std::mutex> l(m_IntermediateQueueMutex);
empty = m_IntermediateQueue.empty ();
m_IntermediateQueue.splice (m_IntermediateQueue.end (), msgs);
}
if (empty)
m_Server.GetService ().post (std::bind (&NTCP2Session::PostI2NPMessages, shared_from_this ()));
} }
void NTCP2Session::PostI2NPMessages (std::vector<std::shared_ptr<I2NPMessage> > msgs) void NTCP2Session::PostI2NPMessages ()
{ {
if (m_IsTerminated) return; if (m_IsTerminated) return;
std::list<std::shared_ptr<I2NPMessage> > msgs;
{
std::lock_guard<std::mutex> l(m_IntermediateQueueMutex);
m_IntermediateQueue.swap (msgs);
}
bool isSemiFull = m_SendQueue.size () > NTCP2_MAX_OUTGOING_QUEUE_SIZE/2; bool isSemiFull = m_SendQueue.size () > NTCP2_MAX_OUTGOING_QUEUE_SIZE/2;
for (auto it: msgs) if (isSemiFull)
if (isSemiFull && it->onDrop) {
it->Drop (); // drop earlier because we can handle it for (auto it: msgs)
else if (it->onDrop)
m_SendQueue.push_back (std::move (it)); it->Drop (); // drop earlier because we can handle it
else
m_SendQueue.push_back (std::move (it));
}
else
m_SendQueue.splice (m_SendQueue.end (), msgs);
if (!m_IsSending && m_IsEstablished) if (!m_IsSending && m_IsEstablished)
SendQueue (); SendQueue ();

9
libi2pd/NTCP2.h

@ -153,7 +153,7 @@ namespace transport
void ServerLogin (); // Bob void ServerLogin (); // Bob
void SendLocalRouterInfo (bool update) override; // after handshake or by update void SendLocalRouterInfo (bool update) override; // after handshake or by update
void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs) override; void SendI2NPMessages (std::list<std::shared_ptr<I2NPMessage> >& msgs) override;
void MoveSendQueue (std::shared_ptr<NTCP2Session> other); void MoveSendQueue (std::shared_ptr<NTCP2Session> other);
private: private:
@ -196,7 +196,7 @@ namespace transport
void SendRouterInfo (); void SendRouterInfo ();
void SendTermination (NTCP2TerminationReason reason); void SendTermination (NTCP2TerminationReason reason);
void SendTerminationAndTerminate (NTCP2TerminationReason reason); void SendTerminationAndTerminate (NTCP2TerminationReason reason);
void PostI2NPMessages (std::vector<std::shared_ptr<I2NPMessage> > msgs); void PostI2NPMessages ();
private: private:
@ -229,7 +229,10 @@ namespace transport
bool m_IsSending, m_IsReceiving; bool m_IsSending, m_IsReceiving;
std::list<std::shared_ptr<I2NPMessage> > m_SendQueue; std::list<std::shared_ptr<I2NPMessage> > m_SendQueue;
uint64_t m_NextRouterInfoResendTime; // seconds since epoch uint64_t m_NextRouterInfoResendTime; // seconds since epoch
std::list<std::shared_ptr<I2NPMessage> > m_IntermediateQueue; // from transports
mutable std::mutex m_IntermediateQueueMutex;
uint16_t m_PaddingSizes[16]; uint16_t m_PaddingSizes[16];
int m_NextPaddingSize; int m_NextPaddingSize;
}; };

2
libi2pd/NetDb.cpp

@ -480,7 +480,7 @@ namespace data
void NetDb::ReseedFromFloodfill(const RouterInfo & ri, int numRouters, int numFloodfills) void NetDb::ReseedFromFloodfill(const RouterInfo & ri, int numRouters, int numFloodfills)
{ {
LogPrint(eLogInfo, "NetDB: Reseeding from floodfill ", ri.GetIdentHashBase64()); LogPrint(eLogInfo, "NetDB: Reseeding from floodfill ", ri.GetIdentHashBase64());
std::vector<std::shared_ptr<i2p::I2NPMessage> > requests; std::list<std::shared_ptr<i2p::I2NPMessage> > requests;
i2p::data::IdentHash ourIdent = i2p::context.GetIdentHash(); i2p::data::IdentHash ourIdent = i2p::context.GetIdentHash();
i2p::data::IdentHash ih = ri.GetIdentHash(); i2p::data::IdentHash ih = ri.GetIdentHash();

25
libi2pd/SSU2Session.cpp

@ -293,6 +293,8 @@ namespace transport
m_SentHandshakePacket.reset (nullptr); m_SentHandshakePacket.reset (nullptr);
m_SessionConfirmedFragment.reset (nullptr); m_SessionConfirmedFragment.reset (nullptr);
m_PathChallenge.reset (nullptr); m_PathChallenge.reset (nullptr);
if (!m_IntermediateQueue.empty ())
m_SendQueue.splice (m_SendQueue.end (), m_IntermediateQueue);
for (auto& it: m_SendQueue) for (auto& it: m_SendQueue)
it->Drop (); it->Drop ();
m_SendQueue.clear (); m_SendQueue.clear ();
@ -372,14 +374,27 @@ namespace transport
} }
void SSU2Session::SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs) void SSU2Session::SendI2NPMessages (std::list<std::shared_ptr<I2NPMessage> >& msgs)
{ {
m_Server.GetService ().post (std::bind (&SSU2Session::PostI2NPMessages, shared_from_this (), msgs)); if (m_State == eSSU2SessionStateTerminated || msgs.empty ()) return;
bool empty = false;
{
std::lock_guard<std::mutex> l(m_IntermediateQueueMutex);
empty = m_IntermediateQueue.empty ();
m_IntermediateQueue.splice (m_IntermediateQueue.end (), msgs);
}
if (empty)
m_Server.GetService ().post (std::bind (&SSU2Session::PostI2NPMessages, shared_from_this ()));
} }
void SSU2Session::PostI2NPMessages (std::vector<std::shared_ptr<I2NPMessage> > msgs) void SSU2Session::PostI2NPMessages ()
{ {
if (m_State == eSSU2SessionStateTerminated) return; if (m_State == eSSU2SessionStateTerminated) return;
std::list<std::shared_ptr<I2NPMessage> > msgs;
{
std::lock_guard<std::mutex> l(m_IntermediateQueueMutex);
m_IntermediateQueue.swap (msgs);
}
uint64_t mts = i2p::util::GetMonotonicMicroseconds (); uint64_t mts = i2p::util::GetMonotonicMicroseconds ();
bool isSemiFull = false; bool isSemiFull = false;
if (m_SendQueue.size ()) if (m_SendQueue.size ())
@ -415,7 +430,7 @@ namespace transport
void SSU2Session::MoveSendQueue (std::shared_ptr<SSU2Session> other) void SSU2Session::MoveSendQueue (std::shared_ptr<SSU2Session> other)
{ {
if (!other || m_SendQueue.empty ()) return; if (!other || m_SendQueue.empty ()) return;
std::vector<std::shared_ptr<I2NPMessage> > msgs; std::list<std::shared_ptr<I2NPMessage> > msgs;
auto ts = i2p::util::GetMillisecondsSinceEpoch (); auto ts = i2p::util::GetMillisecondsSinceEpoch ();
for (auto it: m_SendQueue) for (auto it: m_SendQueue)
if (!it->IsExpired (ts)) if (!it->IsExpired (ts))
@ -424,7 +439,7 @@ namespace transport
it->Drop (); it->Drop ();
m_SendQueue.clear (); m_SendQueue.clear ();
if (!msgs.empty ()) if (!msgs.empty ())
other->PostI2NPMessages (msgs); other->SendI2NPMessages (msgs);
} }
bool SSU2Session::SendQueue () bool SSU2Session::SendQueue ()

6
libi2pd/SSU2Session.h

@ -261,7 +261,7 @@ namespace transport
void FlushData (); void FlushData ();
void Done () override; void Done () override;
void SendLocalRouterInfo (bool update) override; void SendLocalRouterInfo (bool update) override;
void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs) override; void SendI2NPMessages (std::list<std::shared_ptr<I2NPMessage> >& msgs) override;
void MoveSendQueue (std::shared_ptr<SSU2Session> other); void MoveSendQueue (std::shared_ptr<SSU2Session> other);
uint32_t GetRelayTag () const override { return m_RelayTag; }; uint32_t GetRelayTag () const override { return m_RelayTag; };
size_t Resend (uint64_t ts); // return number of resent packets size_t Resend (uint64_t ts); // return number of resent packets
@ -307,7 +307,7 @@ namespace transport
void Established (); void Established ();
void ScheduleConnectTimer (); void ScheduleConnectTimer ();
void HandleConnectTimer (const boost::system::error_code& ecode); void HandleConnectTimer (const boost::system::error_code& ecode);
void PostI2NPMessages (std::vector<std::shared_ptr<I2NPMessage> > msgs); void PostI2NPMessages ();
bool SendQueue (); // returns true if ack block was sent bool SendQueue (); // returns true if ack block was sent
bool SendFragmentedMessage (std::shared_ptr<I2NPMessage> msg); bool SendFragmentedMessage (std::shared_ptr<I2NPMessage> msg);
void ResendHandshakePacket (); void ResendHandshakePacket ();
@ -381,6 +381,8 @@ namespace transport
std::unordered_map<uint32_t, std::pair <std::shared_ptr<SSU2Session>, uint64_t > > m_RelaySessions; // nonce->(Alice, timestamp) for Bob or nonce->(Charlie, timestamp) for Alice std::unordered_map<uint32_t, std::pair <std::shared_ptr<SSU2Session>, uint64_t > > m_RelaySessions; // nonce->(Alice, timestamp) for Bob or nonce->(Charlie, timestamp) for Alice
std::list<std::shared_ptr<I2NPMessage> > m_SendQueue; std::list<std::shared_ptr<I2NPMessage> > m_SendQueue;
i2p::I2NPMessagesHandler m_Handler; i2p::I2NPMessagesHandler m_Handler;
std::list<std::shared_ptr<I2NPMessage> > m_IntermediateQueue; // from transports
mutable std::mutex m_IntermediateQueueMutex;
bool m_IsDataReceived; bool m_IsDataReceived;
double m_RTT; double m_RTT;
int m_MsgLocalExpirationTimeout; int m_MsgLocalExpirationTimeout;

8
libi2pd/TransportSession.h

@ -144,8 +144,12 @@ namespace transport
void SetLastActivityTimestamp (uint64_t ts) { m_LastActivityTimestamp = ts; }; void SetLastActivityTimestamp (uint64_t ts) { m_LastActivityTimestamp = ts; };
virtual uint32_t GetRelayTag () const { return 0; }; virtual uint32_t GetRelayTag () const { return 0; };
virtual void SendLocalRouterInfo (bool update = false) { SendI2NPMessages ({ CreateDatabaseStoreMsg () }); }; virtual void SendLocalRouterInfo (bool update = false)
virtual void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs) = 0; {
std::list<std::shared_ptr<I2NPMessage> > msgs{ CreateDatabaseStoreMsg () };
SendI2NPMessages (msgs);
};
virtual void SendI2NPMessages (std::list<std::shared_ptr<I2NPMessage> >& msgs) = 0;
virtual bool IsEstablished () const = 0; virtual bool IsEstablished () const = 0;
private: private:

36
libi2pd/Transports.cpp

@ -450,15 +450,23 @@ namespace transport
void Transports::SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr<i2p::I2NPMessage> msg) void Transports::SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr<i2p::I2NPMessage> msg)
{ {
if (m_IsOnline) if (m_IsOnline)
SendMessages (ident, std::vector<std::shared_ptr<i2p::I2NPMessage> > {msg }); SendMessages (ident, { msg });
} }
void Transports::SendMessages (const i2p::data::IdentHash& ident, const std::vector<std::shared_ptr<i2p::I2NPMessage> >& msgs) void Transports::SendMessages (const i2p::data::IdentHash& ident, const std::list<std::shared_ptr<i2p::I2NPMessage> >& msgs)
{ {
m_Service->post (std::bind (&Transports::PostMessages, this, ident, msgs)); m_Service->post (std::bind (&Transports::PostMessages, this, ident, msgs));
} }
void Transports::PostMessages (i2p::data::IdentHash ident, std::vector<std::shared_ptr<i2p::I2NPMessage> > msgs) void Transports::SendMessages (const i2p::data::IdentHash& ident, std::list<std::shared_ptr<i2p::I2NPMessage> >&& msgs)
{
m_Service->post ([this, ident, msgs = std::move(msgs)] ()
{
PostMessages (ident, msgs);
});
}
void Transports::PostMessages (i2p::data::IdentHash ident, std::list<std::shared_ptr<i2p::I2NPMessage> > msgs)
{ {
if (ident == i2p::context.GetRouterInfo ().GetIdentHash ()) if (ident == i2p::context.GetRouterInfo ().GetIdentHash ())
{ {
@ -517,11 +525,16 @@ namespace transport
return; return;
} }
} }
for (auto& it1: msgs) if (sz > MAX_NUM_DELAYED_MESSAGES/2)
if (sz > MAX_NUM_DELAYED_MESSAGES/2 && it1->onDrop) {
it1->Drop (); // drop earlier because we can handle it for (auto& it1: msgs)
else if (it1->onDrop)
peer->delayedMessages.push_back (it1); it1->Drop (); // drop earlier because we can handle it
else
peer->delayedMessages.push_back (it1);
}
else
peer->delayedMessages.splice (peer->delayedMessages.end (), msgs);
} }
else else
{ {
@ -865,7 +878,7 @@ namespace transport
if (it->second->delayedMessages.size () > 0) if (it->second->delayedMessages.size () > 0)
{ {
// check if first message is our DatabaseStore (publishing) // check if first message is our DatabaseStore (publishing)
auto firstMsg = peer->delayedMessages[0]; auto firstMsg = peer->delayedMessages.front ();
if (firstMsg && firstMsg->GetTypeID () == eI2NPDatabaseStore && if (firstMsg && firstMsg->GetTypeID () == eI2NPDatabaseStore &&
i2p::data::IdentHash(firstMsg->GetPayload () + DATABASE_STORE_KEY_OFFSET) == i2p::context.GetIdentHash ()) i2p::data::IdentHash(firstMsg->GetPayload () + DATABASE_STORE_KEY_OFFSET) == i2p::context.GetIdentHash ())
sendDatabaseStore = false; // we have it in the list already sendDatabaseStore = false; // we have it in the list already
@ -887,7 +900,10 @@ namespace transport
return; return;
} }
if (!session->IsOutgoing ()) // incoming if (!session->IsOutgoing ()) // incoming
session->SendI2NPMessages ({ CreateDatabaseStoreMsg () }); // send DatabaseStore {
std::list<std::shared_ptr<I2NPMessage> > msgs{ CreateDatabaseStoreMsg () };
session->SendI2NPMessages (msgs); // send DatabaseStore
}
auto r = i2p::data::netdb.FindRouter (ident); // router should be in netdb after SessionConfirmed auto r = i2p::data::netdb.FindRouter (ident); // router should be in netdb after SessionConfirmed
if (r) r->GetProfile ()->Connected (); if (r) r->GetProfile ()->Connected ();
auto ts = i2p::util::GetSecondsSinceEpoch (); auto ts = i2p::util::GetSecondsSinceEpoch ();

7
libi2pd/Transports.h

@ -71,7 +71,7 @@ namespace transport
std::shared_ptr<const i2p::data::RouterInfo> router; std::shared_ptr<const i2p::data::RouterInfo> router;
std::list<std::shared_ptr<TransportSession> > sessions; std::list<std::shared_ptr<TransportSession> > sessions;
uint64_t creationTime, nextRouterInfoUpdateTime, lastSelectionTime; uint64_t creationTime, nextRouterInfoUpdateTime, lastSelectionTime;
std::vector<std::shared_ptr<i2p::I2NPMessage> > delayedMessages; std::list<std::shared_ptr<i2p::I2NPMessage> > delayedMessages;
std::vector<i2p::data::RouterInfo::SupportedTransports> priority; std::vector<i2p::data::RouterInfo::SupportedTransports> priority;
bool isHighBandwidth, isEligible; bool isHighBandwidth, isEligible;
@ -141,7 +141,8 @@ namespace transport
void ReuseX25519KeysPair (std::shared_ptr<i2p::crypto::X25519Keys> pair); void ReuseX25519KeysPair (std::shared_ptr<i2p::crypto::X25519Keys> pair);
void SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr<i2p::I2NPMessage> msg); void SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr<i2p::I2NPMessage> msg);
void SendMessages (const i2p::data::IdentHash& ident, const std::vector<std::shared_ptr<i2p::I2NPMessage> >& msgs); void SendMessages (const i2p::data::IdentHash& ident, const std::list<std::shared_ptr<i2p::I2NPMessage> >& msgs);
void SendMessages (const i2p::data::IdentHash& ident, std::list<std::shared_ptr<i2p::I2NPMessage> >&& msgs);
void PeerConnected (std::shared_ptr<TransportSession> session); void PeerConnected (std::shared_ptr<TransportSession> session);
void PeerDisconnected (std::shared_ptr<TransportSession> session); void PeerDisconnected (std::shared_ptr<TransportSession> session);
@ -185,7 +186,7 @@ namespace transport
void Run (); void Run ();
void RequestComplete (std::shared_ptr<const i2p::data::RouterInfo> r, const i2p::data::IdentHash& ident); void RequestComplete (std::shared_ptr<const i2p::data::RouterInfo> r, const i2p::data::IdentHash& ident);
void HandleRequestComplete (std::shared_ptr<const i2p::data::RouterInfo> r, i2p::data::IdentHash ident); void HandleRequestComplete (std::shared_ptr<const i2p::data::RouterInfo> r, i2p::data::IdentHash ident);
void PostMessages (i2p::data::IdentHash ident, std::vector<std::shared_ptr<i2p::I2NPMessage> > msgs); void PostMessages (i2p::data::IdentHash ident, std::list<std::shared_ptr<i2p::I2NPMessage> > msgs);
bool ConnectToPeer (const i2p::data::IdentHash& ident, std::shared_ptr<Peer> peer); bool ConnectToPeer (const i2p::data::IdentHash& ident, std::shared_ptr<Peer> peer);
void SetPriority (std::shared_ptr<Peer> peer) const; void SetPriority (std::shared_ptr<Peer> peer) const;
void HandlePeerCleanupTimer (const boost::system::error_code& ecode); void HandlePeerCleanupTimer (const boost::system::error_code& ecode);

2
libi2pd/TunnelGateway.cpp

@ -221,7 +221,7 @@ namespace tunnel
void TunnelGateway::SendBuffer () void TunnelGateway::SendBuffer ()
{ {
m_Buffer.CompleteCurrentTunnelDataMessage (); m_Buffer.CompleteCurrentTunnelDataMessage ();
std::vector<std::shared_ptr<I2NPMessage> > newTunnelMsgs; std::list<std::shared_ptr<I2NPMessage> > newTunnelMsgs;
const auto& tunnelDataMsgs = m_Buffer.GetTunnelDataMsgs (); const auto& tunnelDataMsgs = m_Buffer.GetTunnelDataMsgs ();
for (auto& tunnelMsg : tunnelDataMsgs) for (auto& tunnelMsg : tunnelDataMsgs)
{ {

Loading…
Cancel
Save