Browse Source

remote LeaseSets per local destination

pull/102/head
orignal 10 years ago
parent
commit
eff0e13f31
  1. 59
      Destination.cpp
  2. 5
      Destination.h
  3. 4
      HTTPServer.cpp
  4. 4
      I2PTunnel.cpp
  5. 12
      SAM.cpp
  6. 2
      SAM.h
  7. 2
      Streaming.cpp

59
Destination.cpp

@ -4,6 +4,7 @@ @@ -4,6 +4,7 @@
#include <cryptopp/gzip.h>
#include "Log.h"
#include "util.h"
#include "NetDb.h"
#include "Destination.h"
namespace i2p
@ -71,6 +72,9 @@ namespace stream @@ -71,6 +72,9 @@ namespace stream
StreamingDestination::~StreamingDestination ()
{
Stop ();
for (auto it: m_RemoteLeaseSets)
delete it.second;
delete m_LeaseSet;
}
@ -279,11 +283,66 @@ namespace stream @@ -279,11 +283,66 @@ namespace stream
case eI2NPData:
HandleDataMessage (buf + sizeof (I2NPHeader), be16toh (header->size));
break;
case eI2NPDatabaseStore:
HandleDatabaseStoreMessage (buf + sizeof (I2NPHeader), be16toh (header->size));
i2p::HandleI2NPMessage (CreateI2NPMessage (buf, GetI2NPMessageLength (buf))); // TODO: remove
break;
default:
i2p::HandleI2NPMessage (CreateI2NPMessage (buf, GetI2NPMessageLength (buf)));
}
}
void StreamingDestination::HandleDatabaseStoreMessage (const uint8_t * buf, size_t len)
{
I2NPDatabaseStoreMsg * msg = (I2NPDatabaseStoreMsg *)buf;
size_t offset = sizeof (I2NPDatabaseStoreMsg);
if (msg->replyToken) // TODO:
offset += 36;
if (msg->type == 1) // LeaseSet
{
LogPrint ("Remote LeaseSet");
auto it = m_RemoteLeaseSets.find (msg->key);
if (it != m_RemoteLeaseSets.end ())
{
it->second->Update (buf + offset, len - offset);
LogPrint ("Remote LeaseSet updated");
}
else
{
LogPrint ("New remote LeaseSet added");
m_RemoteLeaseSets[msg->key] = new i2p::data::LeaseSet (buf + offset, len - offset);
}
}
else
LogPrint ("Unexpected client's DatabaseStore type ", msg->type, ". Dropped");
}
const i2p::data::LeaseSet * StreamingDestination::FindLeaseSet (const i2p::data::IdentHash& ident)
{
auto it = m_RemoteLeaseSets.find (ident);
if (it != m_RemoteLeaseSets.end ())
{
if (it->second->HasNonExpiredLeases ())
return it->second;
else
{
LogPrint ("All leases of remote LeaseSet expired. Request it");
i2p::data::netdb.RequestDestination (ident, true, m_Pool);
}
}
else
{
auto ls = i2p::data::netdb.FindLeaseSet (ident);
if (ls)
{
ls = new i2p::data::LeaseSet (*ls);
m_RemoteLeaseSets[ident] = ls;
return ls;
}
}
return nullptr;
}
StreamingDestinations destinations;
void StreamingDestinations::Start ()
{

5
Destination.h

@ -6,7 +6,7 @@ @@ -6,7 +6,7 @@
#include "Identity.h"
#include "TunnelPool.h"
#include "CryptoConst.h"
#include "NetDb.h"
#include "LeaseSet.h"
#include "Garlic.h"
#include "Streaming.h"
@ -36,6 +36,7 @@ namespace stream @@ -36,6 +36,7 @@ namespace stream
void HandleNextPacket (Packet * packet);
void SendTunnelDataMsgs (const std::vector<i2p::tunnel::TunnelMessageBlock>& msgs);
void ResetCurrentOutboundTunnel () { m_CurrentOutboundTunnel = nullptr; };
const i2p::data::LeaseSet * FindLeaseSet (const i2p::data::IdentHash& ident);
// I2CP
void HandleDataMessage (const uint8_t * buf, size_t len);
I2NPMessage * CreateDataMessage (const uint8_t * payload, size_t len);
@ -59,6 +60,7 @@ namespace stream @@ -59,6 +60,7 @@ namespace stream
void Run ();
Stream * CreateNewIncomingStream ();
void UpdateLeaseSet ();
void HandleDatabaseStoreMessage (const uint8_t * buf, size_t len);
private:
@ -66,6 +68,7 @@ namespace stream @@ -66,6 +68,7 @@ namespace stream
std::thread * m_Thread;
boost::asio::io_service m_Service;
boost::asio::io_service::work m_Work;
std::map<i2p::data::IdentHash, i2p::data::LeaseSet *> m_RemoteLeaseSets;
std::mutex m_StreamsMutex;
std::map<uint32_t, Stream *> m_Streams;

4
HTTPServer.cpp

@ -850,12 +850,12 @@ namespace util @@ -850,12 +850,12 @@ namespace util
void HTTPConnection::SendToDestination (const i2p::data::IdentHash& destination, const char * buf, size_t len)
{
auto leaseSet = i2p::data::netdb.FindLeaseSet (destination);
auto leaseSet = i2p::stream::GetSharedLocalDestination ()->FindLeaseSet (destination);
if (!leaseSet || !leaseSet->HasNonExpiredLeases ())
{
i2p::data::netdb.Subscribe(destination, i2p::stream::GetSharedLocalDestination ()->GetTunnelPool ());
std::this_thread::sleep_for (std::chrono::seconds(10)); // wait for 10 seconds
leaseSet = i2p::data::netdb.FindLeaseSet (destination);
leaseSet = i2p::stream::GetSharedLocalDestination ()->FindLeaseSet (destination);
if (!leaseSet || !leaseSet->HasNonExpiredLeases ()) // still no LeaseSet
{
SendReply (leaseSet ? "<html>" + itoopieImage + "<br>Leases expired</html>" : "<html>" + itoopieImage + "LeaseSet not found</html>", 504);

4
I2PTunnel.cpp

@ -164,7 +164,7 @@ namespace stream @@ -164,7 +164,7 @@ namespace stream
if (m_DestinationIdentHash)
{
i2p::data::netdb.Subscribe (*m_DestinationIdentHash, GetLocalDestination ()->GetTunnelPool ());
m_RemoteLeaseSet = i2p::data::netdb.FindLeaseSet (*m_DestinationIdentHash);
m_RemoteLeaseSet = GetLocalDestination ()->FindLeaseSet (*m_DestinationIdentHash);
}
else
LogPrint ("I2PTunnel unknown destination ", m_Destination);
@ -194,7 +194,7 @@ namespace stream @@ -194,7 +194,7 @@ namespace stream
{
// try to get it
if (m_DestinationIdentHash)
m_RemoteLeaseSet = i2p::data::netdb.FindLeaseSet (*m_DestinationIdentHash);
m_RemoteLeaseSet = GetLocalDestination ()->FindLeaseSet (*m_DestinationIdentHash);
else
{
i2p::data::IdentHash identHash;

12
SAM.cpp

@ -60,7 +60,7 @@ namespace stream @@ -60,7 +60,7 @@ namespace stream
;
}
m_Socket.close ();
delete this;
// delete this;
}
void SAMSocket::ReceiveHandshake ()
@ -275,7 +275,7 @@ namespace stream @@ -275,7 +275,7 @@ namespace stream
{
if (!ecode) // timeout expired
{
auto leaseSet = i2p::data::netdb.FindLeaseSet (ident);
auto leaseSet = m_Session->localDestination->FindLeaseSet (ident);
if (leaseSet)
Connect (*leaseSet);
else
@ -290,7 +290,7 @@ namespace stream @@ -290,7 +290,7 @@ namespace stream
{
if (!ecode) // timeout expired
{
auto leaseSet = i2p::data::netdb.FindLeaseSet (ident);
auto leaseSet = m_Session->localDestination->FindLeaseSet (ident);
if (leaseSet)
SendNamingLookupReply (leaseSet);
else
@ -361,7 +361,7 @@ namespace stream @@ -361,7 +361,7 @@ namespace stream
SendNamingLookupReply (nullptr);
else if (i2p::data::netdb.GetAddressBook ().GetIdentHash (name, ident))
{
auto leaseSet = i2p::data::netdb.FindLeaseSet (ident);
auto leaseSet = m_Session->localDestination->FindLeaseSet (ident);
if (leaseSet)
SendNamingLookupReply (leaseSet);
else
@ -379,7 +379,7 @@ namespace stream @@ -379,7 +379,7 @@ namespace stream
}
}
void SAMSocket::SendNamingLookupReply (i2p::data::LeaseSet * leaseSet)
void SAMSocket::SendNamingLookupReply (const i2p::data::LeaseSet * leaseSet)
{
uint8_t buf[1024];
char pub[1024];
@ -448,7 +448,7 @@ namespace stream @@ -448,7 +448,7 @@ namespace stream
{
LogPrint ("SAM stream read error: ", ecode.message ());
if (ecode != boost::asio::error::operation_aborted)
m_Socket.get_io_service ().post (boost::bind (&SAMSocket::Terminate, this));
Terminate ();
}
else
{

2
SAM.h

@ -94,7 +94,7 @@ namespace stream @@ -94,7 +94,7 @@ namespace stream
void Connect (const i2p::data::LeaseSet& remote);
void HandleStreamDestinationRequestTimer (const boost::system::error_code& ecode, i2p::data::IdentHash ident);
void HandleNamingLookupDestinationRequestTimer (const boost::system::error_code& ecode, i2p::data::IdentHash ident);
void SendNamingLookupReply (i2p::data::LeaseSet * leaseSet);
void SendNamingLookupReply (const i2p::data::LeaseSet * leaseSet);
private:

2
Streaming.cpp

@ -499,7 +499,7 @@ namespace stream @@ -499,7 +499,7 @@ namespace stream
{
if (!m_RemoteLeaseSet)
{
m_RemoteLeaseSet = i2p::data::netdb.FindLeaseSet (m_RemoteIdentity.GetIdentHash ());
m_RemoteLeaseSet = m_LocalDestination.FindLeaseSet (m_RemoteIdentity.GetIdentHash ());
if (!m_RemoteLeaseSet)
LogPrint ("LeaseSet ", m_RemoteIdentity.GetIdentHash ().ToBase64 (), " not found");
}

Loading…
Cancel
Save