1
0
mirror of https://github.com/PurpleI2P/i2pd.git synced 2025-01-22 04:04:16 +00:00

Merge pull request #18 from orignal/master

Merge pull request from orignal/master
This commit is contained in:
chertov 2014-03-24 06:37:19 +03:00
commit d44cfc73b8
12 changed files with 237 additions and 80 deletions

View File

@ -16,7 +16,7 @@ namespace garlic
{ {
GarlicRoutingSession::GarlicRoutingSession (const i2p::data::RoutingDestination& destination, int numTags): GarlicRoutingSession::GarlicRoutingSession (const i2p::data::RoutingDestination& destination, int numTags):
m_Destination (destination), m_FirstMsgID (0), m_IsAcknowledged (false), m_Destination (destination), m_FirstMsgID (0), m_IsAcknowledged (false),
m_NumTags (numTags), m_NextTag (-1), m_SessionTags (0) m_NumTags (numTags), m_NextTag (-1), m_SessionTags (0), m_TagsCreationTime (0)
{ {
// create new session tags and session key // create new session tags and session key
m_Rnd.GenerateBlock (m_SessionKey, 32); m_Rnd.GenerateBlock (m_SessionKey, 32);
@ -40,6 +40,8 @@ namespace garlic
{ {
for (int i = 0; i < m_NumTags; i++) for (int i = 0; i < m_NumTags; i++)
m_Rnd.GenerateBlock (m_SessionTags + i*32, 32); m_Rnd.GenerateBlock (m_SessionTags + i*32, 32);
m_TagsCreationTime = i2p::util::GetSecondsSinceEpoch ();
SetAcknowledged (false);
} }
} }
@ -48,6 +50,24 @@ namespace garlic
I2NPMessage * m = NewI2NPMessage (); I2NPMessage * m = NewI2NPMessage ();
size_t len = 0; size_t len = 0;
uint8_t * buf = m->GetPayload () + 4; // 4 bytes for length uint8_t * buf = m->GetPayload () + 4; // 4 bytes for length
// take care about tags
if (m_NumTags > 0)
{
if (i2p::util::GetSecondsSinceEpoch () >= m_TagsCreationTime + TAGS_EXPIRATION_TIMEOUT)
{
// old tags expired create new set
LogPrint ("Garlic tags expired");
GenerateSessionTags ();
m_NextTag = -1;
}
else if (!m_IsAcknowledged) // new set of tags was not acknowledged
{
LogPrint ("Previous garlic tags was not acknowledged. Use ElGamal");
m_NextTag = -1; // have to use ElGamal
}
}
// create message
if (m_NextTag < 0 || !m_NumTags) // new session if (m_NextTag < 0 || !m_NumTags) // new session
{ {
// create ElGamal block // create ElGamal block
@ -253,7 +273,7 @@ namespace garlic
session = it->second; session = it->second;
if (!session) if (!session)
{ {
session = new GarlicRoutingSession (destination, 4); // TODO: change it later session = new GarlicRoutingSession (destination, 32);
m_Sessions[destination.GetIdentHash ()] = session; m_Sessions[destination.GetIdentHash ()] = session;
} }
@ -372,7 +392,7 @@ namespace garlic
// later on we should let destination decide // later on we should let destination decide
I2NPHeader * header = (I2NPHeader *)buf; I2NPHeader * header = (I2NPHeader *)buf;
if (header->typeID == eI2NPData) if (header->typeID == eI2NPData)
i2p::stream::HandleDataMessage (&destination, buf + sizeof (I2NPHeader), be16toh (header->size)); i2p::stream::HandleDataMessage (destination, buf + sizeof (I2NPHeader), be16toh (header->size));
else else
LogPrint ("Unexpected I2NP garlic message ", (int)header->typeID); LogPrint ("Unexpected I2NP garlic message ", (int)header->typeID);
break; break;

View File

@ -35,7 +35,7 @@ namespace garlic
}; };
#pragma pack() #pragma pack()
const int TAGS_EXPIRATION_TIMEOUT = 900; // 15 minutes
class GarlicRoutingSession class GarlicRoutingSession
{ {
public: public:
@ -66,6 +66,7 @@ namespace garlic
bool m_IsAcknowledged; bool m_IsAcknowledged;
int m_NumTags, m_NextTag; int m_NumTags, m_NextTag;
uint8_t * m_SessionTags; // m_NumTags*32 bytes uint8_t * m_SessionTags; // m_NumTags*32 bytes
uint32_t m_TagsCreationTime; // seconds since epoch
CryptoPP::CBC_Mode<CryptoPP::AES>::Encryption m_Encryption; CryptoPP::CBC_Mode<CryptoPP::AES>::Encryption m_Encryption;
CryptoPP::AutoSeededRandomPool m_Rnd; CryptoPP::AutoSeededRandomPool m_Rnd;

View File

@ -141,6 +141,8 @@ namespace util
it->GetTunnelConfig ()->Print (s); it->GetTunnelConfig ()->Print (s);
if (it->GetTunnelPool ()) if (it->GetTunnelPool ())
s << " " << "Pool"; s << " " << "Pool";
if (it->IsFailed ())
s << " " << "Failed";
s << " " << (int)it->GetNumSentBytes () << "<BR>"; s << " " << (int)it->GetNumSentBytes () << "<BR>";
} }
@ -149,6 +151,8 @@ namespace util
it.second->GetTunnelConfig ()->Print (s); it.second->GetTunnelConfig ()->Print (s);
if (it.second->GetTunnelPool ()) if (it.second->GetTunnelPool ())
s << " " << "Pool"; s << " " << "Pool";
if (it.second->IsFailed ())
s << " " << "Failed";
s << " " << (int)it.second->GetNumReceivedBytes () << "<BR>"; s << " " << (int)it.second->GetNumReceivedBytes () << "<BR>";
} }

View File

@ -57,13 +57,13 @@ namespace data
LogPrint ("LeaseSet verification failed"); LogPrint ("LeaseSet verification failed");
} }
std::set<Lease> LeaseSet::GetNonExpiredLeases () const const std::vector<Lease> LeaseSet::GetNonExpiredLeases () const
{ {
auto ts = i2p::util::GetMillisecondsSinceEpoch (); auto ts = i2p::util::GetMillisecondsSinceEpoch ();
std::set<Lease> leases; std::vector<Lease> leases;
for (auto& it: m_Leases) for (auto& it: m_Leases)
if (ts < it.endDate) if (ts < it.endDate)
leases.insert (it); leases.push_back (it);
return leases; return leases;
} }

View File

@ -4,7 +4,6 @@
#include <inttypes.h> #include <inttypes.h>
#include <string.h> #include <string.h>
#include <vector> #include <vector>
#include <set>
#include "Identity.h" #include "Identity.h"
namespace i2p namespace i2p
@ -43,7 +42,7 @@ namespace data
const Identity& GetIdentity () const { return m_Identity; }; const Identity& GetIdentity () const { return m_Identity; };
const IdentHash& GetIdentHash () const { return m_IdentHash; }; const IdentHash& GetIdentHash () const { return m_IdentHash; };
const std::vector<Lease>& GetLeases () const { return m_Leases; }; const std::vector<Lease>& GetLeases () const { return m_Leases; };
std::set<Lease> GetNonExpiredLeases () const; const std::vector<Lease> GetNonExpiredLeases () const;
bool HasExpiredLeases () const; bool HasExpiredLeases () const;
bool HasNonExpiredLeases () const; bool HasNonExpiredLeases () const;
const uint8_t * GetEncryptionPublicKey () const { return m_EncryptionKey; }; const uint8_t * GetEncryptionPublicKey () const { return m_EncryptionKey; };

View File

@ -42,6 +42,11 @@ namespace data
return msg; return msg;
} }
void RequestedDestination::ClearExcludedPeers ()
{
m_ExcludedPeers.clear ();
}
#ifndef _WIN32 #ifndef _WIN32
const char NetDb::m_NetDbPath[] = "/netDb"; const char NetDb::m_NetDbPath[] = "/netDb";
#else #else
@ -335,19 +340,26 @@ namespace data
if (inbound) if (inbound)
{ {
RequestedDestination * dest = CreateRequestedDestination (destination, isLeaseSet); RequestedDestination * dest = CreateRequestedDestination (destination, isLeaseSet);
auto floodfill = GetClosestFloodfill (destination, dest->GetExcludedPeers ()); std::vector<i2p::tunnel::TunnelMessageBlock> msgs;
if (floodfill) // request 3 closests floodfills
for (int i = 0; i < 3; i++)
{ {
std::vector<i2p::tunnel::TunnelMessageBlock> msgs; auto floodfill = GetClosestFloodfill (destination, dest->GetExcludedPeers ());
// DatabaseLookup message if (floodfill)
{
// DatabaseLookup message
msgs.push_back (i2p::tunnel::TunnelMessageBlock
{
i2p::tunnel::eDeliveryTypeRouter,
floodfill->GetIdentHash (), 0,
dest->CreateRequestMessage (floodfill, inbound)
});
}
}
if (msgs.size () > 0)
{
dest->ClearExcludedPeers ();
dest->SetLastOutboundTunnel (outbound); dest->SetLastOutboundTunnel (outbound);
msgs.push_back (i2p::tunnel::TunnelMessageBlock
{
i2p::tunnel::eDeliveryTypeRouter,
floodfill->GetIdentHash (), 0,
dest->CreateRequestMessage (floodfill, inbound)
});
outbound->SendTunnelDataMsg (msgs); outbound->SendTunnelDataMsg (msgs);
} }
else else

View File

@ -30,7 +30,8 @@ namespace data
const IdentHash& GetDestination () const { return m_Destination; }; const IdentHash& GetDestination () const { return m_Destination; };
int GetNumExcludedPeers () const { return m_ExcludedPeers.size (); }; int GetNumExcludedPeers () const { return m_ExcludedPeers.size (); };
const std::set<IdentHash>& GetExcludedPeers () { return m_ExcludedPeers; }; const std::set<IdentHash>& GetExcludedPeers () { return m_ExcludedPeers; };
const RouterInfo * GetLastRouter () const { return m_LastRouter; }; void ClearExcludedPeers ();
const RouterInfo * GetLastRouter () const { return m_LastRouter; };
const i2p::tunnel::InboundTunnel * GetLastReplyTunnel () const { return m_LastReplyTunnel; }; const i2p::tunnel::InboundTunnel * GetLastReplyTunnel () const { return m_LastReplyTunnel; };
bool IsExploratory () const { return m_IsExploratory; }; bool IsExploratory () const { return m_IsExploratory; };
bool IsLeaseSet () const { return m_IsLeaseSet; }; bool IsLeaseSet () const { return m_IsLeaseSet; };

View File

@ -1,5 +1,6 @@
#include <fstream> #include <fstream>
#include <algorithm> #include <algorithm>
#include <boost/bind.hpp>
#include <cryptopp/gzip.h> #include <cryptopp/gzip.h>
#include "Log.h" #include "Log.h"
#include "RouterInfo.h" #include "RouterInfo.h"
@ -14,12 +15,14 @@ namespace i2p
{ {
namespace stream namespace stream
{ {
Stream::Stream (StreamingDestination * local, const i2p::data::LeaseSet& remote): Stream::Stream (boost::asio::io_service& service, StreamingDestination * local,
m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (0), const i2p::data::LeaseSet& remote): m_Service (service), m_SendStreamID (0),
m_IsOpen (false), m_LeaseSetUpdated (true), m_LocalDestination (local), m_SequenceNumber (0), m_LastReceivedSequenceNumber (0), m_IsOpen (false),
m_RemoteLeaseSet (remote), m_OutboundTunnel (nullptr) m_LeaseSetUpdated (true), m_LocalDestination (local), m_RemoteLeaseSet (remote),
m_OutboundTunnel (nullptr)
{ {
m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 ();
UpdateCurrentRemoteLease ();
} }
Stream::~Stream () Stream::~Stream ()
@ -62,6 +65,7 @@ namespace stream
{ {
// we have received duplicate. Most likely our outbound tunnel is dead // we have received duplicate. Most likely our outbound tunnel is dead
LogPrint ("Duplicate message ", receivedSeqn, " received"); LogPrint ("Duplicate message ", receivedSeqn, " received");
UpdateCurrentRemoteLease (); // pick another lease
m_OutboundTunnel = i2p::tunnel::tunnels.GetNextOutboundTunnel (); // pick another tunnel m_OutboundTunnel = i2p::tunnel::tunnels.GetNextOutboundTunnel (); // pick another tunnel
if (m_OutboundTunnel) if (m_OutboundTunnel)
SendQuickAck (); // resend ack for previous message again SendQuickAck (); // resend ack for previous message again
@ -276,11 +280,12 @@ namespace stream
m_OutboundTunnel = m_LocalDestination->GetTunnelPool ()->GetNextOutboundTunnel (); m_OutboundTunnel = m_LocalDestination->GetTunnelPool ()->GetNextOutboundTunnel ();
if (m_OutboundTunnel) if (m_OutboundTunnel)
{ {
auto leases = m_RemoteLeaseSet.GetNonExpiredLeases (); auto ts = i2p::util::GetMillisecondsSinceEpoch ();
if (!leases.empty ()) if (ts >= m_CurrentRemoteLease.endDate)
UpdateCurrentRemoteLease ();
if (ts < m_CurrentRemoteLease.endDate)
{ {
auto& lease = *leases.begin (); // TODO: m_OutboundTunnel->SendTunnelDataMsg (m_CurrentRemoteLease.tunnelGateway, m_CurrentRemoteLease.tunnelID, msg);
m_OutboundTunnel->SendTunnelDataMsg (lease.tunnelGateway, lease.tunnelID, msg);
return true; return true;
} }
else else
@ -296,8 +301,19 @@ namespace stream
} }
return false; return false;
} }
void Stream::UpdateCurrentRemoteLease ()
{
auto leases = m_RemoteLeaseSet.GetNonExpiredLeases ();
if (!leases.empty ())
{
uint32_t i = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (0, leases.size () - 1);
m_CurrentRemoteLease = leases[i];
}
else
m_CurrentRemoteLease.endDate = 0;
}
StreamingDestination * sharedLocalDestination = nullptr;
StreamingDestination::StreamingDestination (): m_LeaseSet (nullptr) StreamingDestination::StreamingDestination (): m_LeaseSet (nullptr)
{ {
@ -339,9 +355,10 @@ namespace stream
} }
} }
Stream * StreamingDestination::CreateNewStream (const i2p::data::LeaseSet& remote) Stream * StreamingDestination::CreateNewStream (boost::asio::io_service& service,
const i2p::data::LeaseSet& remote)
{ {
Stream * s = new Stream (this, remote); Stream * s = new Stream (service, this, remote);
m_Streams[s->GetRecvStreamID ()] = s; m_Streams[s->GetRecvStreamID ()] = s;
return s; return s;
} }
@ -402,14 +419,14 @@ namespace stream
size += 32; // tunnel_gw size += 32; // tunnel_gw
*(uint32_t *)(buf + size) = htobe32 (tunnel->GetNextTunnelID ()); *(uint32_t *)(buf + size) = htobe32 (tunnel->GetNextTunnelID ());
size += 4; // tunnel_id size += 4; // tunnel_id
uint64_t ts = tunnel->GetCreationTime () + i2p::tunnel::TUNNEL_EXPIRATION_TIMEOUT; uint64_t ts = tunnel->GetCreationTime () + i2p::tunnel::TUNNEL_EXPIRATION_TIMEOUT - 60; // 1 minute before expiration
ts *= 1000; // in milliseconds ts *= 1000; // in milliseconds
*(uint64_t *)(buf + size) = htobe64 (ts); *(uint64_t *)(buf + size) = htobe64 (ts);
size += 8; // end_date size += 8; // end_date
} }
Sign (buf, size, buf+ size); Sign (buf, size, buf+ size);
size += 40; // signature size += 40; // signature
LogPrint ("Local LeaseSet of ", tunnels.size (), " leases created");
m->len += size + sizeof (I2NPDatabaseStoreMsg); m->len += size + sizeof (I2NPDatabaseStoreMsg);
FillI2NPMessageHeader (m, eI2NPDatabaseStore); FillI2NPMessageHeader (m, eI2NPDatabaseStore);
return m; return m;
@ -420,32 +437,83 @@ namespace stream
CryptoPP::DSA::Signer signer (m_SigningPrivateKey); CryptoPP::DSA::Signer signer (m_SigningPrivateKey);
signer.SignMessage (i2p::context.GetRandomNumberGenerator (), buf, len, signature); signer.SignMessage (i2p::context.GetRandomNumberGenerator (), buf, len, signature);
} }
StreamingDestinations destinations;
void StreamingDestinations::Start ()
{
if (!m_SharedLocalDestination)
m_SharedLocalDestination = new StreamingDestination ();
m_IsRunning = true;
m_Thread = new std::thread (std::bind (&StreamingDestinations::Run, this));
}
void StreamingDestinations::Stop ()
{
delete m_SharedLocalDestination;
m_IsRunning = false;
m_Service.stop ();
if (m_Thread)
{
m_Thread->join ();
delete m_Thread;
m_Thread = 0;
}
}
void StreamingDestinations::Run ()
{
m_Service.run ();
}
Stream * StreamingDestinations::CreateClientStream (const i2p::data::LeaseSet& remote)
{
if (!m_SharedLocalDestination) return nullptr;
return m_SharedLocalDestination->CreateNewStream (m_Service, remote);
}
void StreamingDestinations::DeleteClientStream (Stream * stream)
{
if (m_SharedLocalDestination)
m_SharedLocalDestination->DeleteStream (stream);
else
delete stream;
}
void StreamingDestinations::HandleNextPacket (i2p::data::IdentHash destination, Packet * packet)
{
m_Service.post (boost::bind (&StreamingDestinations::PostNextPacket, this, destination, packet));
}
void StreamingDestinations::PostNextPacket (i2p::data::IdentHash destination, Packet * packet)
{
// TODO: we have onle one destination, might be more
if (m_SharedLocalDestination)
m_SharedLocalDestination->HandleNextPacket (packet);
}
Stream * CreateStream (const i2p::data::LeaseSet& remote) Stream * CreateStream (const i2p::data::LeaseSet& remote)
{ {
if (!sharedLocalDestination) return destinations.CreateClientStream (remote);
sharedLocalDestination = new StreamingDestination ();
return sharedLocalDestination->CreateNewStream (remote);
} }
void DeleteStream (Stream * stream) void DeleteStream (Stream * stream)
{ {
if (sharedLocalDestination) destinations.DeleteClientStream (stream);
sharedLocalDestination->DeleteStream (stream);
} }
void StartStreaming () void StartStreaming ()
{ {
if (!sharedLocalDestination) destinations.Start ();
sharedLocalDestination = new StreamingDestination ();
} }
void StopStreaming () void StopStreaming ()
{ {
delete sharedLocalDestination; destinations.Stop ();
} }
void HandleDataMessage (i2p::data::IdentHash * destination, const uint8_t * buf, size_t len) void HandleDataMessage (i2p::data::IdentHash destination, const uint8_t * buf, size_t len)
{ {
uint32_t length = be32toh (*(uint32_t *)buf); uint32_t length = be32toh (*(uint32_t *)buf);
buf += 4; buf += 4;
@ -465,10 +533,8 @@ namespace stream
uncompressed->len = MAX_PACKET_SIZE; uncompressed->len = MAX_PACKET_SIZE;
} }
decompressor.Get (uncompressed->buf, uncompressed->len); decompressor.Get (uncompressed->buf, uncompressed->len);
// then forward to streaming engine // then forward to streaming engine thread
// TODO: we have onle one destination, might be more destinations.HandleNextPacket (destination, uncompressed);
if (sharedLocalDestination)
sharedLocalDestination->HandleNextPacket (uncompressed);
} }
else else
LogPrint ("Data: protocol ", buf[9], " is not supported"); LogPrint ("Data: protocol ", buf[9], " is not supported");

View File

@ -5,6 +5,8 @@
#include <string> #include <string>
#include <map> #include <map>
#include <set> #include <set>
#include <thread>
#include <boost/asio.hpp>
#include <cryptopp/dsa.h> #include <cryptopp/dsa.h>
#include "I2PEndian.h" #include "I2PEndian.h"
#include "Queue.h" #include "Queue.h"
@ -67,7 +69,7 @@ namespace stream
{ {
public: public:
Stream (StreamingDestination * local, const i2p::data::LeaseSet& remote); Stream (boost::asio::io_service& service, StreamingDestination * local, const i2p::data::LeaseSet& remote);
~Stream (); ~Stream ();
uint32_t GetSendStreamID () const { return m_SendStreamID; }; uint32_t GetSendStreamID () const { return m_SendStreamID; };
uint32_t GetRecvStreamID () const { return m_RecvStreamID; }; uint32_t GetRecvStreamID () const { return m_RecvStreamID; };
@ -90,13 +92,17 @@ namespace stream
void SavePacket (Packet * packet); void SavePacket (Packet * packet);
void ProcessPacket (Packet * packet); void ProcessPacket (Packet * packet);
void UpdateCurrentRemoteLease ();
private: private:
boost::asio::io_service& m_Service;
uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber, m_LastReceivedSequenceNumber; uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber, m_LastReceivedSequenceNumber;
bool m_IsOpen, m_LeaseSetUpdated; bool m_IsOpen, m_LeaseSetUpdated;
StreamingDestination * m_LocalDestination; StreamingDestination * m_LocalDestination;
const i2p::data::LeaseSet& m_RemoteLeaseSet; const i2p::data::LeaseSet& m_RemoteLeaseSet;
i2p::data::Lease m_CurrentRemoteLease;
i2p::util::Queue<Packet> m_ReceiveQueue; i2p::util::Queue<Packet> m_ReceiveQueue;
std::set<Packet *, PacketCmp> m_SavedPackets; std::set<Packet *, PacketCmp> m_SavedPackets;
i2p::tunnel::OutboundTunnel * m_OutboundTunnel; i2p::tunnel::OutboundTunnel * m_OutboundTunnel;
@ -116,7 +122,7 @@ namespace stream
i2p::tunnel::TunnelPool * GetTunnelPool () const { return m_Pool; }; i2p::tunnel::TunnelPool * GetTunnelPool () const { return m_Pool; };
void Sign (uint8_t * buf, int len, uint8_t * signature) const; void Sign (uint8_t * buf, int len, uint8_t * signature) const;
Stream * CreateNewStream (const i2p::data::LeaseSet& remote); Stream * CreateNewStream (boost::asio::io_service& service, const i2p::data::LeaseSet& remote);
void DeleteStream (Stream * stream); void DeleteStream (Stream * stream);
void HandleNextPacket (Packet * packet); void HandleNextPacket (Packet * packet);
@ -139,13 +145,44 @@ namespace stream
CryptoPP::DSA::PrivateKey m_SigningPrivateKey; CryptoPP::DSA::PrivateKey m_SigningPrivateKey;
}; };
class StreamingDestinations
{
public:
StreamingDestinations (): m_IsRunning (false), m_Thread (nullptr),
m_Work (m_Service), m_SharedLocalDestination (nullptr) {};
~StreamingDestinations () {};
void Start ();
void Stop ();
void HandleNextPacket (i2p::data::IdentHash destination, Packet * packet);
Stream * CreateClientStream (const i2p::data::LeaseSet& remote);
void DeleteClientStream (Stream * stream);
private:
void Run ();
void PostNextPacket (i2p::data::IdentHash destination, Packet * packet);
private:
bool m_IsRunning;
std::thread * m_Thread;
boost::asio::io_service m_Service;
boost::asio::io_service::work m_Work;
StreamingDestination * m_SharedLocalDestination;
};
Stream * CreateStream (const i2p::data::LeaseSet& remote); Stream * CreateStream (const i2p::data::LeaseSet& remote);
void DeleteStream (Stream * stream); void DeleteStream (Stream * stream);
void StartStreaming (); void StartStreaming ();
void StopStreaming (); void StopStreaming ();
// assuming data is I2CP message // assuming data is I2CP message
void HandleDataMessage (i2p::data::IdentHash * destination, const uint8_t * buf, size_t len); void HandleDataMessage (i2p::data::IdentHash destination, const uint8_t * buf, size_t len);
I2NPMessage * CreateDataMessage (Stream * s, uint8_t * payload, size_t len); I2NPMessage * CreateDataMessage (Stream * s, uint8_t * payload, size_t len);
} }
} }

View File

@ -14,7 +14,8 @@ namespace i2p
namespace tunnel namespace tunnel
{ {
Tunnel::Tunnel (TunnelConfig * config): m_Config (config), m_Pool (nullptr), m_IsEstablished (false) Tunnel::Tunnel (TunnelConfig * config): m_Config (config), m_Pool (nullptr),
m_IsEstablished (false), m_IsFailed (false)
{ {
} }
@ -130,6 +131,7 @@ namespace tunnel
void InboundTunnel::HandleTunnelDataMsg (I2NPMessage * msg) void InboundTunnel::HandleTunnelDataMsg (I2NPMessage * msg)
{ {
if (IsFailed ()) SetFailed (false); // incoming messages means a tunnel is alive
msg->from = this; msg->from = this;
EncryptTunnelMsg (msg); EncryptTunnelMsg (msg);
m_Endpoint.HandleDecryptedTunnelDataMsg (msg); m_Endpoint.HandleDecryptedTunnelDataMsg (msg);
@ -225,11 +227,14 @@ namespace tunnel
InboundTunnel * tunnel = nullptr; InboundTunnel * tunnel = nullptr;
size_t minReceived = 0; size_t minReceived = 0;
for (auto it : m_InboundTunnels) for (auto it : m_InboundTunnels)
{
if (it.second->IsFailed ()) continue;
if (!tunnel || it.second->GetNumReceivedBytes () < minReceived) if (!tunnel || it.second->GetNumReceivedBytes () < minReceived)
{ {
tunnel = it.second; tunnel = it.second;
minReceived = it.second->GetNumReceivedBytes (); minReceived = it.second->GetNumReceivedBytes ();
} }
}
return tunnel; return tunnel;
} }
@ -240,7 +245,7 @@ namespace tunnel
for (auto it : m_InboundTunnels) for (auto it : m_InboundTunnels)
{ {
if (i >= num) break; if (i >= num) break;
if (it.second->GetNextIdentHash () != i2p::context.GetRouterInfo ().GetIdentHash ()) if (!it.second->IsFailed () && it.second->GetNextIdentHash () != i2p::context.GetRouterInfo ().GetIdentHash ())
{ {
// exclude one hop tunnels // exclude one hop tunnels
v.push_back (it.second); v.push_back (it.second);
@ -254,23 +259,17 @@ namespace tunnel
{ {
CryptoPP::RandomNumberGenerator& rnd = i2p::context.GetRandomNumberGenerator (); CryptoPP::RandomNumberGenerator& rnd = i2p::context.GetRandomNumberGenerator ();
uint32_t ind = rnd.GenerateWord32 (0, m_OutboundTunnels.size () - 1), i = 0; uint32_t ind = rnd.GenerateWord32 (0, m_OutboundTunnels.size () - 1), i = 0;
OutboundTunnel * tunnel = nullptr;
for (auto it: m_OutboundTunnels) for (auto it: m_OutboundTunnels)
{ {
if (i >= ind) return it; if (i >= ind) return it;
else i++; if (!it->IsFailed ())
}
return nullptr;
// TODO: implement it base on profiling
/*OutboundTunnel * tunnel = nullptr;
size_t minSent = 0;
for (auto it : m_OutboundTunnels)
if (!tunnel || it->GetNumSentBytes () < minSent)
{ {
tunnel = it; tunnel = it;
minSent = it->GetNumSentBytes (); i++;
} }
return tunnel;*/ }
return tunnel;
} }
TunnelPool * Tunnels::CreateTunnelPool (i2p::data::LocalDestination * localDestination) TunnelPool * Tunnels::CreateTunnelPool (i2p::data::LocalDestination * localDestination)

View File

@ -37,6 +37,9 @@ namespace tunnel
TunnelConfig * GetTunnelConfig () const { return m_Config; } TunnelConfig * GetTunnelConfig () const { return m_Config; }
bool IsEstablished () const { return m_IsEstablished; }; bool IsEstablished () const { return m_IsEstablished; };
bool IsFailed () const { return m_IsFailed; };
void SetFailed (bool failed) { m_IsFailed = failed; }
TunnelPool * GetTunnelPool () const { return m_Pool; }; TunnelPool * GetTunnelPool () const { return m_Pool; };
void SetTunnelPool (TunnelPool * pool) { m_Pool = pool; }; void SetTunnelPool (TunnelPool * pool) { m_Pool = pool; };
@ -57,7 +60,7 @@ namespace tunnel
TunnelConfig * m_Config; TunnelConfig * m_Config;
TunnelPool * m_Pool; // pool, tunnel belongs to, or null TunnelPool * m_Pool; // pool, tunnel belongs to, or null
bool m_IsEstablished; bool m_IsEstablished, m_IsFailed;
CryptoPP::ECB_Mode<CryptoPP::AES>::Decryption m_ECBDecryption; CryptoPP::ECB_Mode<CryptoPP::AES>::Decryption m_ECBDecryption;
CryptoPP::CBC_Mode<CryptoPP::AES>::Decryption m_CBCDecryption; CryptoPP::CBC_Mode<CryptoPP::AES>::Decryption m_CBCDecryption;

View File

@ -31,8 +31,6 @@ namespace tunnel
void TunnelPool::TunnelCreated (InboundTunnel * createdTunnel) void TunnelPool::TunnelCreated (InboundTunnel * createdTunnel)
{ {
m_InboundTunnels.insert (createdTunnel); m_InboundTunnels.insert (createdTunnel);
if (m_LocalDestination)
m_LocalDestination->UpdateLeaseSet ();
} }
void TunnelPool::TunnelExpired (InboundTunnel * expiredTunnel) void TunnelPool::TunnelExpired (InboundTunnel * expiredTunnel)
@ -54,10 +52,10 @@ namespace tunnel
void TunnelPool::TunnelExpired (OutboundTunnel * expiredTunnel) void TunnelPool::TunnelExpired (OutboundTunnel * expiredTunnel)
{ {
if (expiredTunnel) if (expiredTunnel)
{ {
expiredTunnel->SetTunnelPool (nullptr); expiredTunnel->SetTunnelPool (nullptr);
m_OutboundTunnels.erase (expiredTunnel); m_OutboundTunnels.erase (expiredTunnel);
} }
if (expiredTunnel == m_LastOutboundTunnel) if (expiredTunnel == m_LastOutboundTunnel)
m_LastOutboundTunnel = nullptr; m_LastOutboundTunnel = nullptr;
} }
@ -69,8 +67,11 @@ namespace tunnel
for (auto it : m_InboundTunnels) for (auto it : m_InboundTunnels)
{ {
if (i >= num) break; if (i >= num) break;
v.push_back (it); if (!it->IsFailed ())
i++; {
v.push_back (it);
i++;
}
} }
return v; return v;
} }
@ -82,7 +83,7 @@ namespace tunnel
if (m_LastOutboundTunnel && tunnel == m_LastOutboundTunnel) if (m_LastOutboundTunnel && tunnel == m_LastOutboundTunnel)
{ {
for (auto it: m_OutboundTunnels) for (auto it: m_OutboundTunnels)
if (it != m_LastOutboundTunnel) if (it != m_LastOutboundTunnel && !it->IsFailed ())
{ {
tunnel = it; tunnel = it;
break; break;
@ -109,19 +110,33 @@ namespace tunnel
{ {
LogPrint ("Tunnel test ", (int)it.first, " failed"); LogPrint ("Tunnel test ", (int)it.first, " failed");
// both outbound and inbound tunnels considered as invalid // both outbound and inbound tunnels considered as invalid
TunnelExpired (it.second.first); it.second.first->SetFailed (true);
TunnelExpired (it.second.second); it.second.second->SetFailed (true);
} }
m_Tests.clear (); m_Tests.clear ();
auto it1 = m_OutboundTunnels.begin (); auto it1 = m_OutboundTunnels.begin ();
auto it2 = m_InboundTunnels.begin (); auto it2 = m_InboundTunnels.begin ();
while (it1 != m_OutboundTunnels.end () && it2 != m_InboundTunnels.end ()) while (it1 != m_OutboundTunnels.end () && it2 != m_InboundTunnels.end ())
{ {
uint32_t msgID = rnd.GenerateWord32 (); bool failed = false;
m_Tests[msgID] = std::make_pair (*it1, *it2); if ((*it1)->IsFailed ())
(*it1)->SendTunnelDataMsg ((*it2)->GetNextIdentHash (), (*it2)->GetNextTunnelID (), {
CreateDeliveryStatusMsg (msgID)); failed = true;
it1++; it2++; it1++;
}
if ((*it2)->IsFailed ())
{
failed = true;
it2++;
}
if (!failed)
{
uint32_t msgID = rnd.GenerateWord32 ();
m_Tests[msgID] = std::make_pair (*it1, *it2);
(*it1)->SendTunnelDataMsg ((*it2)->GetNextIdentHash (), (*it2)->GetNextTunnelID (),
CreateDeliveryStatusMsg (msgID));
it1++; it2++;
}
} }
} }