Browse Source

shared path between streams

pull/376/head
orignal 9 years ago
parent
commit
93720fffd4
  1. 21
      Garlic.cpp
  2. 20
      Garlic.h
  3. 4
      LeaseSet.cpp
  4. 2
      LeaseSet.h
  5. 21
      Streaming.cpp
  6. 2
      Streaming.h

21
Garlic.cpp

@ -43,6 +43,26 @@ namespace garlic
m_UnconfirmedTagsMsgs.clear (); m_UnconfirmedTagsMsgs.clear ();
} }
std::shared_ptr<GarlicRoutingPath> GarlicRoutingSession::GetSharedRoutingPath ()
{
if (!m_SharedRoutingPath) return nullptr;
uint32_t ts = i2p::util::GetSecondsSinceEpoch ();
if (!m_SharedRoutingPath->outboundTunnel->IsEstablished () ||
ts*1000LL > m_SharedRoutingPath->remoteLease->endDate ||
ts > m_SharedRoutingPath->updateTime + ROUTING_PATH_EXPIRATION_TIMEOUT)
m_SharedRoutingPath = nullptr;
return m_SharedRoutingPath;
}
void GarlicRoutingSession::SetSharedRoutingPath (std::shared_ptr<GarlicRoutingPath> path)
{
if (path && path->outboundTunnel && path->remoteLease)
path->updateTime = i2p::util::GetSecondsSinceEpoch ();
else
path = nullptr;
m_SharedRoutingPath = path;
}
GarlicRoutingSession::UnconfirmedTags * GarlicRoutingSession::GenerateSessionTags () GarlicRoutingSession::UnconfirmedTags * GarlicRoutingSession::GenerateSessionTags ()
{ {
auto tags = new UnconfirmedTags (m_NumTags); auto tags = new UnconfirmedTags (m_NumTags);
@ -570,6 +590,7 @@ namespace garlic
std::unique_lock<std::mutex> l(m_SessionsMutex); std::unique_lock<std::mutex> l(m_SessionsMutex);
for (auto it = m_Sessions.begin (); it != m_Sessions.end ();) for (auto it = m_Sessions.begin (); it != m_Sessions.end ();)
{ {
it->second->GetSharedRoutingPath (); // delete shared path if necessary
if (!it->second->CleanupExpiredTags ()) if (!it->second->CleanupExpiredTags ())
{ {
LogPrint (eLogInfo, "Routing session to ", it->first.ToBase32 (), " deleted"); LogPrint (eLogInfo, "Routing session to ", it->first.ToBase32 (), " deleted");

20
Garlic.h

@ -16,6 +16,11 @@
namespace i2p namespace i2p
{ {
namespace tunnel
{
class OutboundTunnel;
}
namespace garlic namespace garlic
{ {
@ -27,19 +32,18 @@ namespace garlic
eGarlicDeliveryTypeTunnel = 3 eGarlicDeliveryTypeTunnel = 3
}; };
#pragma pack(1)
struct ElGamalBlock struct ElGamalBlock
{ {
uint8_t sessionKey[32]; uint8_t sessionKey[32];
uint8_t preIV[32]; uint8_t preIV[32];
uint8_t padding[158]; uint8_t padding[158];
}; };
#pragma pack()
const int INCOMING_TAGS_EXPIRATION_TIMEOUT = 960; // 16 minutes const int INCOMING_TAGS_EXPIRATION_TIMEOUT = 960; // 16 minutes
const int OUTGOING_TAGS_EXPIRATION_TIMEOUT = 720; // 12 minutes const int OUTGOING_TAGS_EXPIRATION_TIMEOUT = 720; // 12 minutes
const int OUTGOING_TAGS_CONFIRMATION_TIMEOUT = 10; // 10 seconds const int OUTGOING_TAGS_CONFIRMATION_TIMEOUT = 10; // 10 seconds
const int LEASET_CONFIRMATION_TIMEOUT = 4000; // in milliseconds const int LEASET_CONFIRMATION_TIMEOUT = 4000; // in milliseconds
const int ROUTING_PATH_EXPIRATION_TIMEOUT = 30; // 30 seconds
struct SessionTag: public i2p::data::Tag<32> struct SessionTag: public i2p::data::Tag<32>
{ {
@ -54,6 +58,13 @@ namespace garlic
uint32_t creationTime; // seconds since epoch uint32_t creationTime; // seconds since epoch
}; };
struct GarlicRoutingPath
{
std::shared_ptr<i2p::tunnel::OutboundTunnel> outboundTunnel;
std::shared_ptr<const i2p::data::Lease> remoteLease;
uint32_t updateTime; // seconds since epoch
};
class GarlicDestination; class GarlicDestination;
class GarlicRoutingSession: public std::enable_shared_from_this<GarlicRoutingSession> class GarlicRoutingSession: public std::enable_shared_from_this<GarlicRoutingSession>
{ {
@ -89,6 +100,9 @@ namespace garlic
if (m_LeaseSetUpdateStatus != eLeaseSetDoNotSend) m_LeaseSetUpdateStatus = eLeaseSetUpdated; if (m_LeaseSetUpdateStatus != eLeaseSetDoNotSend) m_LeaseSetUpdateStatus = eLeaseSetUpdated;
}; };
std::shared_ptr<GarlicRoutingPath> GetSharedRoutingPath ();
void SetSharedRoutingPath (std::shared_ptr<GarlicRoutingPath> path);
private: private:
size_t CreateAESBlock (uint8_t * buf, std::shared_ptr<const I2NPMessage> msg); size_t CreateAESBlock (uint8_t * buf, std::shared_ptr<const I2NPMessage> msg);
@ -115,6 +129,8 @@ namespace garlic
i2p::crypto::CBCEncryption m_Encryption; i2p::crypto::CBCEncryption m_Encryption;
std::unique_ptr<const i2p::crypto::ElGamalEncryption> m_ElGamalEncryption; std::unique_ptr<const i2p::crypto::ElGamalEncryption> m_ElGamalEncryption;
std::shared_ptr<GarlicRoutingPath> m_SharedRoutingPath;
public: public:
// for HTTP only // for HTTP only
size_t GetNumOutgoingTags () const { return m_SessionTags.size (); }; size_t GetNumOutgoingTags () const { return m_SessionTags.size (); };

4
LeaseSet.cpp

@ -180,10 +180,10 @@ namespace data
} }
} }
const std::vector<std::shared_ptr<Lease> > LeaseSet::GetNonExpiredLeases (bool withThreshold) const const std::vector<std::shared_ptr<const Lease> > LeaseSet::GetNonExpiredLeases (bool withThreshold) const
{ {
auto ts = i2p::util::GetMillisecondsSinceEpoch (); auto ts = i2p::util::GetMillisecondsSinceEpoch ();
std::vector<std::shared_ptr<Lease> > leases; std::vector<std::shared_ptr<const Lease> > leases;
for (auto it: m_Leases) for (auto it: m_Leases)
{ {
auto endDate = it->endDate; auto endDate = it->endDate;

2
LeaseSet.h

@ -52,7 +52,7 @@ namespace data
const uint8_t * GetBuffer () const { return m_Buffer; }; const uint8_t * GetBuffer () const { return m_Buffer; };
size_t GetBufferLen () const { return m_BufferLen; }; size_t GetBufferLen () const { return m_BufferLen; };
bool IsValid () const { return m_IsValid; }; bool IsValid () const { return m_IsValid; };
const std::vector<std::shared_ptr<Lease> > GetNonExpiredLeases (bool withThreshold = true) const; const std::vector<std::shared_ptr<const Lease> > GetNonExpiredLeases (bool withThreshold = true) const;
bool HasExpiredLeases () const; bool HasExpiredLeases () const;
bool IsExpired () const; bool IsExpired () const;
bool IsEmpty () const { return m_Leases.empty (); }; bool IsEmpty () const { return m_Leases.empty (); };

21
Streaming.cpp

@ -271,6 +271,10 @@ namespace stream
m_LastWindowSizeIncreaseTime = ts; m_LastWindowSizeIncreaseTime = ts;
} }
} }
if (!seqn && m_RoutingSession) // first message confirmed
m_RoutingSession->SetSharedRoutingPath (
std::make_shared<i2p::garlic::GarlicRoutingPath> (
i2p::garlic::GarlicRoutingPath{m_CurrentOutboundTunnel, m_CurrentRemoteLease, 0}));
} }
else else
break; break;
@ -589,6 +593,21 @@ namespace stream
return; return;
} }
} }
if (!m_CurrentOutboundTunnel) // first message to send
{
// try to get shared path first
if (!m_RoutingSession)
m_RoutingSession = m_LocalDestination.GetOwner ()->GetRoutingSession (m_RemoteLeaseSet, true);
if (m_RoutingSession)
{
auto routingPath = m_RoutingSession->GetSharedRoutingPath ();
if (routingPath)
{
m_CurrentOutboundTunnel = routingPath->outboundTunnel;
m_CurrentRemoteLease = routingPath->remoteLease;
}
}
}
if (!m_CurrentOutboundTunnel || !m_CurrentOutboundTunnel->IsEstablished ()) if (!m_CurrentOutboundTunnel || !m_CurrentOutboundTunnel->IsEstablished ())
m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNewOutboundTunnel (m_CurrentOutboundTunnel); m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNewOutboundTunnel (m_CurrentOutboundTunnel);
if (!m_CurrentOutboundTunnel) if (!m_CurrentOutboundTunnel)
@ -669,11 +688,13 @@ namespace stream
m_RTO = INITIAL_RTO; // drop RTO to initial upon tunnels pair change first time m_RTO = INITIAL_RTO; // drop RTO to initial upon tunnels pair change first time
// no break here // no break here
case 4: case 4:
if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr);
UpdateCurrentRemoteLease (); // pick another lease UpdateCurrentRemoteLease (); // pick another lease
LogPrint (eLogWarning, "Streaming: Another remote lease has been selected for stream"); LogPrint (eLogWarning, "Streaming: Another remote lease has been selected for stream");
break; break;
case 3: case 3:
// pick another outbound tunnel // pick another outbound tunnel
if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr);
m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNextOutboundTunnel (m_CurrentOutboundTunnel); m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNextOutboundTunnel (m_CurrentOutboundTunnel);
LogPrint (eLogWarning, "Streaming: Another outbound tunnel has been selected for stream"); LogPrint (eLogWarning, "Streaming: Another outbound tunnel has been selected for stream");
break; break;

2
Streaming.h

@ -172,7 +172,7 @@ namespace stream
std::shared_ptr<const i2p::data::IdentityEx> m_RemoteIdentity; std::shared_ptr<const i2p::data::IdentityEx> m_RemoteIdentity;
std::shared_ptr<const i2p::data::LeaseSet> m_RemoteLeaseSet; std::shared_ptr<const i2p::data::LeaseSet> m_RemoteLeaseSet;
std::shared_ptr<i2p::garlic::GarlicRoutingSession> m_RoutingSession; std::shared_ptr<i2p::garlic::GarlicRoutingSession> m_RoutingSession;
std::shared_ptr<i2p::data::Lease> m_CurrentRemoteLease; std::shared_ptr<const i2p::data::Lease> m_CurrentRemoteLease;
std::shared_ptr<i2p::tunnel::OutboundTunnel> m_CurrentOutboundTunnel; std::shared_ptr<i2p::tunnel::OutboundTunnel> m_CurrentOutboundTunnel;
std::queue<Packet *> m_ReceiveQueue; std::queue<Packet *> m_ReceiveQueue;
std::set<Packet *, PacketCmp> m_SavedPackets; std::set<Packet *, PacketCmp> m_SavedPackets;

Loading…
Cancel
Save