diff --git a/Destination.cpp b/Destination.cpp index fc6fa1a0..08c461b5 100644 --- a/Destination.cpp +++ b/Destination.cpp @@ -4,6 +4,7 @@ #include #include "Log.h" #include "util.h" +#include "NetDb.h" #include "Destination.h" namespace i2p @@ -71,6 +72,9 @@ namespace stream StreamingDestination::~StreamingDestination () { Stop (); + for (auto it: m_RemoteLeaseSets) + delete it.second; + delete m_LeaseSet; } @@ -278,11 +282,66 @@ namespace stream { case eI2NPData: HandleDataMessage (buf + sizeof (I2NPHeader), be16toh (header->size)); + break; + case eI2NPDatabaseStore: + HandleDatabaseStoreMessage (buf + sizeof (I2NPHeader), be16toh (header->size)); + i2p::HandleI2NPMessage (CreateI2NPMessage (buf, GetI2NPMessageLength (buf))); // TODO: remove break; default: i2p::HandleI2NPMessage (CreateI2NPMessage (buf, GetI2NPMessageLength (buf))); } } + + void StreamingDestination::HandleDatabaseStoreMessage (const uint8_t * buf, size_t len) + { + I2NPDatabaseStoreMsg * msg = (I2NPDatabaseStoreMsg *)buf; + size_t offset = sizeof (I2NPDatabaseStoreMsg); + if (msg->replyToken) // TODO: + offset += 36; + if (msg->type == 1) // LeaseSet + { + LogPrint ("Remote LeaseSet"); + auto it = m_RemoteLeaseSets.find (msg->key); + if (it != m_RemoteLeaseSets.end ()) + { + it->second->Update (buf + offset, len - offset); + LogPrint ("Remote LeaseSet updated"); + } + else + { + LogPrint ("New remote LeaseSet added"); + m_RemoteLeaseSets[msg->key] = new i2p::data::LeaseSet (buf + offset, len - offset); + } + } + else + LogPrint ("Unexpected client's DatabaseStore type ", msg->type, ". Dropped"); + } + + const i2p::data::LeaseSet * StreamingDestination::FindLeaseSet (const i2p::data::IdentHash& ident) + { + auto it = m_RemoteLeaseSets.find (ident); + if (it != m_RemoteLeaseSets.end ()) + { + if (it->second->HasNonExpiredLeases ()) + return it->second; + else + { + LogPrint ("All leases of remote LeaseSet expired. Request it"); + i2p::data::netdb.RequestDestination (ident, true, m_Pool); + } + } + else + { + auto ls = i2p::data::netdb.FindLeaseSet (ident); + if (ls) + { + ls = new i2p::data::LeaseSet (*ls); + m_RemoteLeaseSets[ident] = ls; + return ls; + } + } + return nullptr; + } StreamingDestinations destinations; void StreamingDestinations::Start () diff --git a/Destination.h b/Destination.h index 11934f80..41ac99f6 100644 --- a/Destination.h +++ b/Destination.h @@ -6,7 +6,7 @@ #include "Identity.h" #include "TunnelPool.h" #include "CryptoConst.h" -#include "NetDb.h" +#include "LeaseSet.h" #include "Garlic.h" #include "Streaming.h" @@ -36,6 +36,7 @@ namespace stream void HandleNextPacket (Packet * packet); void SendTunnelDataMsgs (const std::vector& msgs); void ResetCurrentOutboundTunnel () { m_CurrentOutboundTunnel = nullptr; }; + const i2p::data::LeaseSet * FindLeaseSet (const i2p::data::IdentHash& ident); // I2CP void HandleDataMessage (const uint8_t * buf, size_t len); I2NPMessage * CreateDataMessage (const uint8_t * payload, size_t len); @@ -59,6 +60,7 @@ namespace stream void Run (); Stream * CreateNewIncomingStream (); void UpdateLeaseSet (); + void HandleDatabaseStoreMessage (const uint8_t * buf, size_t len); private: @@ -66,6 +68,7 @@ namespace stream std::thread * m_Thread; boost::asio::io_service m_Service; boost::asio::io_service::work m_Work; + std::map m_RemoteLeaseSets; std::mutex m_StreamsMutex; std::map m_Streams; @@ -76,9 +79,9 @@ namespace stream i2p::tunnel::OutboundTunnel * m_CurrentOutboundTunnel; i2p::data::LeaseSet * m_LeaseSet; bool m_IsPublic; - + std::function m_Acceptor; - + public: // for HTTP only diff --git a/HTTPServer.cpp b/HTTPServer.cpp index 691c5ed4..dfca5ee8 100644 --- a/HTTPServer.cpp +++ b/HTTPServer.cpp @@ -850,12 +850,12 @@ namespace util void HTTPConnection::SendToDestination (const i2p::data::IdentHash& destination, const char * buf, size_t len) { - auto leaseSet = i2p::data::netdb.FindLeaseSet (destination); + auto leaseSet = i2p::stream::GetSharedLocalDestination ()->FindLeaseSet (destination); if (!leaseSet || !leaseSet->HasNonExpiredLeases ()) { i2p::data::netdb.Subscribe(destination, i2p::stream::GetSharedLocalDestination ()->GetTunnelPool ()); std::this_thread::sleep_for (std::chrono::seconds(10)); // wait for 10 seconds - leaseSet = i2p::data::netdb.FindLeaseSet (destination); + leaseSet = i2p::stream::GetSharedLocalDestination ()->FindLeaseSet (destination); if (!leaseSet || !leaseSet->HasNonExpiredLeases ()) // still no LeaseSet { SendReply (leaseSet ? "" + itoopieImage + "
Leases expired" : "" + itoopieImage + "LeaseSet not found", 504); diff --git a/I2PTunnel.cpp b/I2PTunnel.cpp index d2ae51dd..3a5bc033 100644 --- a/I2PTunnel.cpp +++ b/I2PTunnel.cpp @@ -164,7 +164,7 @@ namespace stream if (m_DestinationIdentHash) { i2p::data::netdb.Subscribe (*m_DestinationIdentHash, GetLocalDestination ()->GetTunnelPool ()); - m_RemoteLeaseSet = i2p::data::netdb.FindLeaseSet (*m_DestinationIdentHash); + m_RemoteLeaseSet = GetLocalDestination ()->FindLeaseSet (*m_DestinationIdentHash); } else LogPrint ("I2PTunnel unknown destination ", m_Destination); @@ -194,7 +194,7 @@ namespace stream { // try to get it if (m_DestinationIdentHash) - m_RemoteLeaseSet = i2p::data::netdb.FindLeaseSet (*m_DestinationIdentHash); + m_RemoteLeaseSet = GetLocalDestination ()->FindLeaseSet (*m_DestinationIdentHash); else { i2p::data::IdentHash identHash; diff --git a/SAM.cpp b/SAM.cpp index e834659c..08a62956 100644 --- a/SAM.cpp +++ b/SAM.cpp @@ -60,7 +60,7 @@ namespace stream ; } m_Socket.close (); - delete this; + // delete this; } void SAMSocket::ReceiveHandshake () @@ -275,7 +275,7 @@ namespace stream { if (!ecode) // timeout expired { - auto leaseSet = i2p::data::netdb.FindLeaseSet (ident); + auto leaseSet = m_Session->localDestination->FindLeaseSet (ident); if (leaseSet) Connect (*leaseSet); else @@ -290,7 +290,7 @@ namespace stream { if (!ecode) // timeout expired { - auto leaseSet = i2p::data::netdb.FindLeaseSet (ident); + auto leaseSet = m_Session->localDestination->FindLeaseSet (ident); if (leaseSet) SendNamingLookupReply (leaseSet); else @@ -361,7 +361,7 @@ namespace stream SendNamingLookupReply (nullptr); else if (i2p::data::netdb.GetAddressBook ().GetIdentHash (name, ident)) { - auto leaseSet = i2p::data::netdb.FindLeaseSet (ident); + auto leaseSet = m_Session->localDestination->FindLeaseSet (ident); if (leaseSet) SendNamingLookupReply (leaseSet); else @@ -379,7 +379,7 @@ namespace stream } } - void SAMSocket::SendNamingLookupReply (i2p::data::LeaseSet * leaseSet) + void SAMSocket::SendNamingLookupReply (const i2p::data::LeaseSet * leaseSet) { uint8_t buf[1024]; char pub[1024]; @@ -448,7 +448,7 @@ namespace stream { LogPrint ("SAM stream read error: ", ecode.message ()); if (ecode != boost::asio::error::operation_aborted) - m_Socket.get_io_service ().post (boost::bind (&SAMSocket::Terminate, this)); + Terminate (); } else { diff --git a/SAM.h b/SAM.h index 97d51757..833d3f90 100644 --- a/SAM.h +++ b/SAM.h @@ -94,7 +94,7 @@ namespace stream void Connect (const i2p::data::LeaseSet& remote); void HandleStreamDestinationRequestTimer (const boost::system::error_code& ecode, i2p::data::IdentHash ident); void HandleNamingLookupDestinationRequestTimer (const boost::system::error_code& ecode, i2p::data::IdentHash ident); - void SendNamingLookupReply (i2p::data::LeaseSet * leaseSet); + void SendNamingLookupReply (const i2p::data::LeaseSet * leaseSet); private: diff --git a/Streaming.cpp b/Streaming.cpp index af46656b..a7a21658 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -499,7 +499,7 @@ namespace stream { if (!m_RemoteLeaseSet) { - m_RemoteLeaseSet = i2p::data::netdb.FindLeaseSet (m_RemoteIdentity.GetIdentHash ()); + m_RemoteLeaseSet = m_LocalDestination.FindLeaseSet (m_RemoteIdentity.GetIdentHash ()); if (!m_RemoteLeaseSet) LogPrint ("LeaseSet ", m_RemoteIdentity.GetIdentHash ().ToBase64 (), " not found"); }