Browse Source

send tunnel participant data to transport session directly. Implemented TunnelTransportSender

pull/2094/merge
orignal 7 days ago
parent
commit
e76d09e1a1
  1. 2
      libi2pd/NetDb.cpp
  2. 3
      libi2pd/TransitTunnel.cpp
  3. 1
      libi2pd/TransitTunnel.h
  4. 7
      libi2pd/Transports.cpp
  5. 1
      libi2pd/Transports.h
  6. 64
      libi2pd/TunnelBase.cpp
  7. 25
      libi2pd/TunnelBase.h
  8. 35
      libi2pd/TunnelGateway.cpp
  9. 5
      libi2pd/TunnelGateway.h

2
libi2pd/NetDb.cpp

@ -502,7 +502,7 @@ namespace data @@ -502,7 +502,7 @@ namespace data
}
// send them off
i2p::transport::transports.SendMessages(ih, requests);
i2p::transport::transports.SendMessages(ih, std::move (requests));
}
bool NetDb::LoadRouterInfo (const std::string& path, uint64_t ts)

3
libi2pd/TransitTunnel.cpp

@ -62,7 +62,8 @@ namespace tunnel @@ -62,7 +62,8 @@ namespace tunnel
auto num = m_TunnelDataMsgs.size ();
if (num > 1)
LogPrint (eLogDebug, "TransitTunnel: ", GetTunnelID (), "->", GetNextTunnelID (), " ", num);
i2p::transport::transports.SendMessages (GetNextIdentHash (), m_TunnelDataMsgs); // send and clear
if (!m_Sender) m_Sender = std::make_unique<TunnelTransportSender>();
m_Sender->SendMessagesTo (GetNextIdentHash (), m_TunnelDataMsgs); // send and clear
}
}

1
libi2pd/TransitTunnel.h

@ -63,6 +63,7 @@ namespace tunnel @@ -63,6 +63,7 @@ namespace tunnel
size_t m_NumTransmittedBytes;
std::list<std::shared_ptr<i2p::I2NPMessage> > m_TunnelDataMsgs;
std::unique_ptr<TunnelTransportSender> m_Sender;
};
class TransitTunnelGateway: public TransitTunnel

7
libi2pd/Transports.cpp

@ -454,13 +454,6 @@ namespace transport @@ -454,13 +454,6 @@ namespace transport
return {}; // invalid future
}
std::future<std::shared_ptr<TransportSession> > Transports::SendMessages (const i2p::data::IdentHash& ident, std::list<std::shared_ptr<i2p::I2NPMessage> >& msgs)
{
std::list<std::shared_ptr<i2p::I2NPMessage> > msgs1;
msgs.swap (msgs1);
return SendMessages (ident, std::move (msgs1));
}
std::future<std::shared_ptr<TransportSession> > Transports::SendMessages (const i2p::data::IdentHash& ident, std::list<std::shared_ptr<i2p::I2NPMessage> >&& msgs)
{
return boost::asio::post (*m_Service, boost::asio::use_future ([this, ident, msgs = std::move(msgs)] () mutable

1
libi2pd/Transports.h

@ -145,7 +145,6 @@ namespace transport @@ -145,7 +145,6 @@ namespace transport
void ReuseX25519KeysPair (std::shared_ptr<i2p::crypto::X25519Keys> pair);
std::future<std::shared_ptr<TransportSession> > SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr<i2p::I2NPMessage> msg);
std::future<std::shared_ptr<TransportSession> > SendMessages (const i2p::data::IdentHash& ident, std::list<std::shared_ptr<i2p::I2NPMessage> >& msgs);
std::future<std::shared_ptr<TransportSession> > SendMessages (const i2p::data::IdentHash& ident, std::list<std::shared_ptr<i2p::I2NPMessage> >&& msgs);
void PeerConnected (std::shared_ptr<TransportSession> session);

64
libi2pd/TunnelBase.cpp

@ -0,0 +1,64 @@ @@ -0,0 +1,64 @@
/*
* Copyright (c) 2024, The PurpleI2P Project
*
* This file is part of Purple i2pd project and licensed under BSD3
*
* See full license text in LICENSE file at top of project tree
*
*/
#include "Transports.h"
#include "TunnelBase.h"
namespace i2p
{
namespace tunnel
{
void TunnelTransportSender::SendMessagesTo (const i2p::data::IdentHash& to,
std::list<std::shared_ptr<I2NPMessage> >&& msgs)
{
if (msgs.empty ()) return;
auto currentTransport = m_CurrentTransport.lock ();
if (!currentTransport)
{
// try to obtain transport from pending request or send thought transport is not complete
if (m_PendingTransport.valid ()) // pending request?
{
if (m_PendingTransport.wait_for(std::chrono::seconds(0)) == std::future_status::ready)
{
// pending request complete
currentTransport = m_PendingTransport.get (); // take transports used in pending request
if (currentTransport)
{
if (currentTransport->IsEstablished ())
m_CurrentTransport = currentTransport;
else
currentTransport = nullptr;
}
}
else // still pending
{
// send through transports, but don't update pending transport
i2p::transport::transports.SendMessages (to, std::move (msgs));
return;
}
}
}
if (currentTransport) // session is good
// send to session directly
currentTransport->SendI2NPMessages (msgs);
else // no session yet
// send through transports
m_PendingTransport = i2p::transport::transports.SendMessages (to, std::move (msgs));
}
void TunnelTransportSender::SendMessagesTo (const i2p::data::IdentHash& to,
std::list<std::shared_ptr<I2NPMessage> >& msgs)
{
std::list<std::shared_ptr<i2p::I2NPMessage> > msgs1;
msgs.swap (msgs1);
SendMessagesTo (to, std::move (msgs1));
}
}
}

25
libi2pd/TunnelBase.h

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright (c) 2013-2022, The PurpleI2P Project
* Copyright (c) 2013-2024, The PurpleI2P Project
*
* This file is part of Purple i2pd project and licensed under BSD3
*
@ -11,12 +11,19 @@ @@ -11,12 +11,19 @@
#include <inttypes.h>
#include <memory>
#include <future>
#include <list>
#include "Timestamp.h"
#include "I2NPProtocol.h"
#include "Identity.h"
namespace i2p
{
namespace transport
{
class TransportSession;
}
namespace tunnel
{
const size_t TUNNEL_DATA_MSG_SIZE = 1028;
@ -76,6 +83,22 @@ namespace tunnel @@ -76,6 +83,22 @@ namespace tunnel
return t1 < t2;
}
};
class TunnelTransportSender final
{
public:
TunnelTransportSender () = default;
~TunnelTransportSender () = default;
void SendMessagesTo (const i2p::data::IdentHash& to, std::list<std::shared_ptr<I2NPMessage> >&& msgs);
void SendMessagesTo (const i2p::data::IdentHash& to, std::list<std::shared_ptr<I2NPMessage> >& msgs); // send and clear
private:
std::weak_ptr<i2p::transport::TransportSession> m_CurrentTransport;
std::future<std::shared_ptr<i2p::transport::TransportSession> > m_PendingTransport;
};
}
}

35
libi2pd/TunnelGateway.cpp

@ -235,40 +235,9 @@ namespace tunnel @@ -235,40 +235,9 @@ namespace tunnel
m_NumSentBytes += TUNNEL_DATA_MSG_SIZE;
}
m_Buffer.ClearTunnelDataMsgs ();
// send
auto currentTransport = m_CurrentTransport.lock ();
if (!currentTransport)
{
// try to obtain transport from pending request or send thought transport is not complete
if (m_PendingTransport.valid ()) // pending request?
{
if (m_PendingTransport.wait_for(std::chrono::seconds(0)) == std::future_status::ready)
{
// pending request complete
currentTransport = m_PendingTransport.get (); // take transports used in pending request
if (currentTransport)
{
if (currentTransport->IsEstablished ())
m_CurrentTransport = currentTransport;
else
currentTransport = nullptr;
}
}
else // still pending
{
// send through transports, but don't update pending transport
i2p::transport::transports.SendMessages (m_Tunnel.GetNextIdentHash (), std::move (newTunnelMsgs));
return;
}
}
}
if (currentTransport) // session is good
// send to session directly
currentTransport->SendI2NPMessages (newTunnelMsgs);
else // no session yet
// send through transports
m_PendingTransport = i2p::transport::transports.SendMessages (m_Tunnel.GetNextIdentHash (), std::move (newTunnelMsgs));
if (!m_Sender) m_Sender = std::make_unique<TunnelTransportSender>();
m_Sender->SendMessagesTo (m_Tunnel.GetNextIdentHash (), std::move (newTunnelMsgs));
}
}
}

5
libi2pd/TunnelGateway.h

@ -12,9 +12,7 @@ @@ -12,9 +12,7 @@
#include <inttypes.h>
#include <vector>
#include <memory>
#include <future>
#include "I2NPProtocol.h"
#include "TransportSession.h"
#include "TunnelBase.h"
namespace i2p
@ -59,8 +57,7 @@ namespace tunnel @@ -59,8 +57,7 @@ namespace tunnel
TunnelBase& m_Tunnel;
TunnelGatewayBuffer m_Buffer;
size_t m_NumSentBytes;
std::weak_ptr<i2p::transport::TransportSession> m_CurrentTransport;
std::future<std::shared_ptr<i2p::transport::TransportSession> > m_PendingTransport;
std::unique_ptr<TunnelTransportSender> m_Sender;
};
}
}

Loading…
Cancel
Save