Browse Source

remember last outgoing tunnel per stream rather than per client destination

pull/113/head
orignal 10 years ago
parent
commit
aca87b5fd1
  1. 8
      Datagram.cpp
  2. 22
      Destination.cpp
  3. 4
      Destination.h
  4. 24
      Streaming.cpp
  5. 1
      Streaming.h

8
Datagram.cpp

@ -46,8 +46,9 @@ namespace datagram
void DatagramDestination::SendMsg (I2NPMessage * msg, const i2p::data::LeaseSet& remote) void DatagramDestination::SendMsg (I2NPMessage * msg, const i2p::data::LeaseSet& remote)
{ {
auto outboundTunnel = m_Owner.GetTunnelPool ()->GetNextOutboundTunnel ();
auto leases = remote.GetNonExpiredLeases (); auto leases = remote.GetNonExpiredLeases ();
if (!leases.empty ()) if (!leases.empty () && outboundTunnel)
{ {
std::vector<i2p::tunnel::TunnelMessageBlock> msgs; std::vector<i2p::tunnel::TunnelMessageBlock> msgs;
uint32_t i = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (0, leases.size () - 1); uint32_t i = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (0, leases.size () - 1);
@ -58,11 +59,14 @@ namespace datagram
leases[i].tunnelGateway, leases[i].tunnelID, leases[i].tunnelGateway, leases[i].tunnelID,
garlic garlic
}); });
m_Owner.SendTunnelDataMsgs (msgs); outboundTunnel->SendTunnelDataMsg (msgs);
} }
else else
{ {
if (outboundTunnel)
LogPrint (eLogWarning, "Failed to send datagram. All leases expired"); LogPrint (eLogWarning, "Failed to send datagram. All leases expired");
else
LogPrint (eLogWarning, "Failed to send datagram. No outbound tunnels");
DeleteI2NPMessage (msg); DeleteI2NPMessage (msg);
} }
} }

22
Destination.cpp

@ -12,8 +12,7 @@ namespace client
{ {
ClientDestination::ClientDestination (bool isPublic, i2p::data::SigningKeyType sigType): ClientDestination::ClientDestination (bool isPublic, i2p::data::SigningKeyType sigType):
m_IsRunning (false), m_Thread (nullptr), m_Service (nullptr), m_Work (nullptr), m_IsRunning (false), m_Thread (nullptr), m_Service (nullptr), m_Work (nullptr),
m_CurrentOutboundTunnel (nullptr), m_LeaseSet (nullptr), m_IsPublic (isPublic), m_LeaseSet (nullptr), m_IsPublic (isPublic), m_DatagramDestination (nullptr)
m_DatagramDestination (nullptr)
{ {
m_Keys = i2p::data::PrivateKeys::CreateRandomKeys (sigType); m_Keys = i2p::data::PrivateKeys::CreateRandomKeys (sigType);
CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg); CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg);
@ -26,8 +25,7 @@ namespace client
ClientDestination::ClientDestination (const std::string& fullPath, bool isPublic): ClientDestination::ClientDestination (const std::string& fullPath, bool isPublic):
m_IsRunning (false), m_Thread (nullptr), m_Service (nullptr), m_Work (nullptr), m_IsRunning (false), m_Thread (nullptr), m_Service (nullptr), m_Work (nullptr),
m_CurrentOutboundTunnel (nullptr), m_LeaseSet (nullptr), m_IsPublic (isPublic), m_LeaseSet (nullptr), m_IsPublic (isPublic), m_DatagramDestination (nullptr)
m_DatagramDestination (nullptr)
{ {
std::ifstream s(fullPath.c_str (), std::ifstream::binary); std::ifstream s(fullPath.c_str (), std::ifstream::binary);
if (s.is_open ()) if (s.is_open ())
@ -63,8 +61,7 @@ namespace client
ClientDestination::ClientDestination (const i2p::data::PrivateKeys& keys, bool isPublic): ClientDestination::ClientDestination (const i2p::data::PrivateKeys& keys, bool isPublic):
m_IsRunning (false), m_Thread (nullptr), m_Service (nullptr), m_Work (nullptr), m_IsRunning (false), m_Thread (nullptr), m_Service (nullptr), m_Work (nullptr),
m_Keys (keys), m_CurrentOutboundTunnel (nullptr), m_LeaseSet (nullptr), m_IsPublic (isPublic), m_Keys (keys), m_LeaseSet (nullptr), m_IsPublic (isPublic), m_DatagramDestination (nullptr)
m_DatagramDestination (nullptr)
{ {
CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg); CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg);
dh.GenerateKeyPair(i2p::context.GetRandomNumberGenerator (), m_EncryptionPrivateKey, m_EncryptionPublicKey); dh.GenerateKeyPair(i2p::context.GetRandomNumberGenerator (), m_EncryptionPrivateKey, m_EncryptionPublicKey);
@ -175,19 +172,6 @@ namespace client
} }
} }
void ClientDestination::SendTunnelDataMsgs (const std::vector<i2p::tunnel::TunnelMessageBlock>& msgs)
{
m_CurrentOutboundTunnel = m_Pool->GetNextOutboundTunnel (m_CurrentOutboundTunnel);
if (m_CurrentOutboundTunnel)
m_CurrentOutboundTunnel->SendTunnelDataMsg (msgs);
else
{
LogPrint ("No outbound tunnels in the pool");
for (auto it: msgs)
DeleteI2NPMessage (it.data);
}
}
void ClientDestination::ProcessGarlicMessage (I2NPMessage * msg) void ClientDestination::ProcessGarlicMessage (I2NPMessage * msg)
{ {
m_Service->post (std::bind (&ClientDestination::HandleGarlicMessage, this, msg)); m_Service->post (std::bind (&ClientDestination::HandleGarlicMessage, this, msg));

4
Destination.h

@ -35,10 +35,7 @@ namespace client
boost::asio::io_service * GetService () { return m_Service; }; boost::asio::io_service * GetService () { return m_Service; };
i2p::tunnel::TunnelPool * GetTunnelPool () { return m_Pool; }; i2p::tunnel::TunnelPool * GetTunnelPool () { return m_Pool; };
bool IsReady () const { return m_LeaseSet && m_LeaseSet->HasNonExpiredLeases (); }; bool IsReady () const { return m_LeaseSet && m_LeaseSet->HasNonExpiredLeases (); };
void ResetCurrentOutboundTunnel () { m_CurrentOutboundTunnel = nullptr; };
const i2p::data::LeaseSet * FindLeaseSet (const i2p::data::IdentHash& ident); const i2p::data::LeaseSet * FindLeaseSet (const i2p::data::IdentHash& ident);
void SendTunnelDataMsgs (const std::vector<i2p::tunnel::TunnelMessageBlock>& msgs);
// streaming // streaming
i2p::stream::StreamingDestination * GetStreamingDestination () const { return m_StreamingDestination; }; i2p::stream::StreamingDestination * GetStreamingDestination () const { return m_StreamingDestination; };
@ -85,7 +82,6 @@ namespace client
std::map<i2p::data::IdentHash, i2p::data::LeaseSet *> m_RemoteLeaseSets; std::map<i2p::data::IdentHash, i2p::data::LeaseSet *> m_RemoteLeaseSets;
i2p::tunnel::TunnelPool * m_Pool; i2p::tunnel::TunnelPool * m_Pool;
i2p::tunnel::OutboundTunnel * m_CurrentOutboundTunnel;
i2p::data::LeaseSet * m_LeaseSet; i2p::data::LeaseSet * m_LeaseSet;
bool m_IsPublic; bool m_IsPublic;

24
Streaming.cpp

@ -15,9 +15,9 @@ namespace stream
const i2p::data::LeaseSet& remote, int port): m_Service (service), m_SendStreamID (0), const i2p::data::LeaseSet& remote, int port): m_Service (service), m_SendStreamID (0),
m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_IsOpen (false), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_IsOpen (false),
m_IsReset (false), m_IsAckSendScheduled (false), m_LocalDestination (local), m_IsReset (false), m_IsAckSendScheduled (false), m_LocalDestination (local),
m_RemoteLeaseSet (&remote), m_RoutingSession (nullptr), m_ReceiveTimer (m_Service), m_RemoteLeaseSet (&remote), m_RoutingSession (nullptr), m_CurrentOutboundTunnel (nullptr),
m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_NumSentBytes (0), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service),
m_NumReceivedBytes (0), m_Port (port) m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port)
{ {
m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 ();
UpdateCurrentRemoteLease (); UpdateCurrentRemoteLease ();
@ -26,9 +26,9 @@ namespace stream
Stream::Stream (boost::asio::io_service& service, StreamingDestination& local): Stream::Stream (boost::asio::io_service& service, StreamingDestination& local):
m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1),
m_IsOpen (false), m_IsReset (false), m_IsAckSendScheduled (false), m_LocalDestination (local), m_IsOpen (false), m_IsReset (false), m_IsAckSendScheduled (false), m_LocalDestination (local),
m_RemoteLeaseSet (nullptr), m_RoutingSession (nullptr), m_ReceiveTimer (m_Service), m_RemoteLeaseSet (nullptr), m_RoutingSession (nullptr), m_CurrentOutboundTunnel (nullptr),
m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_NumSentBytes (0), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service),
m_NumReceivedBytes (0), m_Port (0) m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0)
{ {
m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 ();
} }
@ -116,7 +116,7 @@ namespace stream
{ {
// we have received duplicate. Most likely our outbound tunnel is dead // we have received duplicate. Most likely our outbound tunnel is dead
LogPrint ("Duplicate message ", receivedSeqn, " received"); LogPrint ("Duplicate message ", receivedSeqn, " received");
m_LocalDestination.GetOwner ().ResetCurrentOutboundTunnel (); // pick another outbound tunnel m_CurrentOutboundTunnel = nullptr; // pick another outbound tunnel
UpdateCurrentRemoteLease (); // pick another lease UpdateCurrentRemoteLease (); // pick another lease
SendQuickAck (); // resend ack for previous message again SendQuickAck (); // resend ack for previous message again
delete packet; // packet dropped delete packet; // packet dropped
@ -432,6 +432,12 @@ namespace stream
return; return;
} }
} }
m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ().GetTunnelPool ()->GetNextOutboundTunnel (m_CurrentOutboundTunnel);
if (!m_CurrentOutboundTunnel)
{
LogPrint ("No outbound tunnels in the pool");
return;
}
auto ts = i2p::util::GetMillisecondsSinceEpoch (); auto ts = i2p::util::GetMillisecondsSinceEpoch ();
if (ts >= m_CurrentRemoteLease.endDate) if (ts >= m_CurrentRemoteLease.endDate)
@ -450,7 +456,7 @@ namespace stream
}); });
m_NumSentBytes += it->GetLength (); m_NumSentBytes += it->GetLength ();
} }
m_LocalDestination.GetOwner ().SendTunnelDataMsgs (msgs); m_CurrentOutboundTunnel->SendTunnelDataMsg (msgs);
} }
else else
LogPrint ("All leases are expired"); LogPrint ("All leases are expired");
@ -484,7 +490,7 @@ namespace stream
} }
if (packets.size () > 0) if (packets.size () > 0)
{ {
m_LocalDestination.GetOwner ().ResetCurrentOutboundTunnel (); // pick another outbound tunnel m_CurrentOutboundTunnel = nullptr; // pick another outbound tunnel
UpdateCurrentRemoteLease (); // pick another lease UpdateCurrentRemoteLease (); // pick another lease
SendPackets (packets); SendPackets (packets);
} }

1
Streaming.h

@ -141,6 +141,7 @@ namespace stream
const i2p::data::LeaseSet * m_RemoteLeaseSet; const i2p::data::LeaseSet * m_RemoteLeaseSet;
i2p::garlic::GarlicRoutingSession * m_RoutingSession; i2p::garlic::GarlicRoutingSession * m_RoutingSession;
i2p::data::Lease m_CurrentRemoteLease; i2p::data::Lease m_CurrentRemoteLease;
i2p::tunnel::OutboundTunnel * m_CurrentOutboundTunnel;
std::queue<Packet *> m_ReceiveQueue; std::queue<Packet *> m_ReceiveQueue;
std::set<Packet *, PacketCmp> m_SavedPackets; std::set<Packet *, PacketCmp> m_SavedPackets;
std::set<Packet *, PacketCmp> m_SentPackets; std::set<Packet *, PacketCmp> m_SentPackets;

Loading…
Cancel
Save