diff --git a/Garlic.cpp b/Garlic.cpp index 1db02493..ad824fd7 100644 --- a/Garlic.cpp +++ b/Garlic.cpp @@ -15,28 +15,40 @@ namespace i2p namespace garlic { GarlicRoutingSession::GarlicRoutingSession (const i2p::data::RoutingDestination * destination, int numTags): - m_Destination (destination), m_NumTags (numTags), m_NextTag (-1), m_SessionTags (0) + m_Destination (destination), m_FirstMsgID (0), m_IsAcknowledged (false), + m_NumTags (numTags), m_NextTag (-1), m_SessionTags (0) { + // create new session tags and session key m_Rnd.GenerateBlock (m_SessionKey, 32); if (m_NumTags > 0) - { + { m_SessionTags = new uint8_t[m_NumTags*32]; - for (int i = 0; i < m_NumTags; i++) - m_Rnd.GenerateBlock (m_SessionTags + i*32, 32); + GenerateSessionTags (); } + else + m_SessionTags = nullptr; } GarlicRoutingSession::~GarlicRoutingSession () { delete[] m_SessionTags; } - + + void GarlicRoutingSession::GenerateSessionTags () + { + if (m_SessionTags) + { + for (int i = 0; i < m_NumTags; i++) + m_Rnd.GenerateBlock (m_SessionTags + i*32, 32); + } + } + I2NPMessage * GarlicRoutingSession::WrapSingleMessage (I2NPMessage * msg, I2NPMessage * leaseSet) { I2NPMessage * m = NewI2NPMessage (); size_t len = 0; uint8_t * buf = m->GetPayload () + 4; // 4 bytes for length - if (m_NextTag < 0) // new session + if (m_NextTag < 0 || !m_NumTags) // new session { // create ElGamal block ElGamalBlock elGamal; @@ -44,23 +56,30 @@ namespace garlic m_Rnd.GenerateBlock (elGamal.preIV, 32); // Pre-IV uint8_t iv[32]; // IV is first 16 bytes CryptoPP::SHA256().CalculateDigest(iv, elGamal.preIV, 32); - i2p::crypto::ElGamalEncrypt (m_Destination->GetEncryptionPublicKey (), (uint8_t *)&elGamal, sizeof(elGamal), buf, true); - buf += 514; - // AES block + i2p::crypto::ElGamalEncrypt (m_Destination->GetEncryptionPublicKey (), (uint8_t *)&elGamal, sizeof(elGamal), buf, true); m_Encryption.SetKeyWithIV (m_SessionKey, 32, iv); - len += 514 + CreateAESBlock (buf, msg, leaseSet); + buf += 514; + len += 514; } else // existing session { // session tag - memcpy (buf, m_SessionTags + m_NextTag*32, 32); - buf += 32; + memcpy (buf, m_SessionTags + m_NextTag*32, 32); uint8_t iv[32]; // IV is first 16 bytes CryptoPP::SHA256().CalculateDigest(iv, m_SessionTags + m_NextTag*32, 32); m_Encryption.SetKeyWithIV (m_SessionKey, 32, iv); - // AES block - len += 32 + CreateAESBlock (buf, msg, leaseSet); + buf += 32; + len += 32; + + // re-create session tags if necessary + if (m_NextTag >= m_NumTags - 1) // we have used last tag + { + GenerateSessionTags (); + m_NextTag = -1; + } } + // AES block + len += CreateAESBlock (buf, msg, leaseSet); m_NextTag++; *(uint32_t *)(m->GetPayload ()) = htobe32 (len); m->len += len + 4; @@ -73,10 +92,13 @@ namespace garlic size_t GarlicRoutingSession::CreateAESBlock (uint8_t * buf, I2NPMessage * msg, I2NPMessage * leaseSet) { size_t blockSize = 0; - *(uint16_t *)buf = htobe16 (m_NumTags); // tag count + *(uint16_t *)buf = m_NextTag < 0 ? htobe16 (m_NumTags) : 0; // tag count blockSize += 2; - memcpy (buf + blockSize, m_SessionTags, m_NumTags*32); // tags - blockSize += m_NumTags*32; + if (m_NextTag < 0) // session tags recreated + { + memcpy (buf + blockSize, m_SessionTags, m_NumTags*32); // tags + blockSize += m_NumTags*32; + } uint32_t * payloadSize = (uint32_t *)(buf + blockSize); blockSize += 4; uint8_t * payloadHash = buf + blockSize; @@ -97,18 +119,21 @@ namespace garlic size_t GarlicRoutingSession::CreateGarlicPayload (uint8_t * payload, I2NPMessage * msg, I2NPMessage * leaseSet) { uint64_t ts = i2p::util::GetMillisecondsSinceEpoch () + 5000; // 5 sec - uint32_t msgID = m_Rnd.GenerateWord32 (); + uint32_t msgID = m_Rnd.GenerateWord32 (); size_t size = 0; uint8_t * numCloves = payload + size; *numCloves = 0; size++; - - if (leaseSet) + + if (m_NextTag < 0) // new session { - // clove is DeliveryStatus is LeaseSet is presented + // clove is DeliveryStatus size += CreateDeliveryStatusClove (payload + size, msgID); (*numCloves)++; - + m_FirstMsgID = msgID; + } + if (leaseSet) + { // clove is our leaseSet if presented size += CreateGarlicClove (payload + size, leaseSet, false); (*numCloves)++; @@ -164,10 +189,11 @@ namespace garlic { buf[size] = eGarlicDeliveryTypeTunnel << 5; // delivery instructions flag tunnel size++; - *(uint32_t *)(buf + size) = htobe32 (tunnel->GetNextTunnelID ()); // tunnelID - size += 4; + // hash and tunnelID sequence is reversed for Garlic memcpy (buf + size, tunnel->GetNextIdentHash (), 32); // To Hash size += 32; + *(uint32_t *)(buf + size) = htobe32 (tunnel->GetNextTunnelID ()); // tunnelID + size += 4; } else { @@ -204,7 +230,22 @@ namespace garlic m_Sessions.clear (); } - I2NPMessage * GarlicRouting::WrapSingleMessage (const i2p::data::RoutingDestination * destination, + I2NPMessage * GarlicRouting::WrapSingleMessage (const i2p::data::RoutingDestination * destination, I2NPMessage * msg) + { + if (!destination) return nullptr; + auto it = m_Sessions.find (destination->GetIdentHash ()); + if (it != m_Sessions.end ()) + { + m_Sessions.erase (it); + delete it->second; + } + GarlicRoutingSession * session = new GarlicRoutingSession (destination, 0); // not follow-on messages expected + m_Sessions[destination->GetIdentHash ()] = session; + + return session->WrapSingleMessage (msg, nullptr); + } + + I2NPMessage * GarlicRouting::WrapMessage (const i2p::data::RoutingDestination * destination, I2NPMessage * msg, I2NPMessage * leaseSet) { if (!destination) return nullptr; @@ -219,13 +260,10 @@ namespace garlic } I2NPMessage * ret = session->WrapSingleMessage (msg, leaseSet); - if (session->GetNumRemainingSessionTags () <= 0) - { - m_Sessions.erase (destination->GetIdentHash ()); - delete session; - } + if (!session->GetNextTag ()) // tags have beed recreated + m_CreatedSessions[session->GetFirstMsgID ()] = session; return ret; - } + } void GarlicRouting::HandleGarlicMessage (uint8_t * buf, size_t len, bool isFromTunnel) { @@ -248,14 +286,18 @@ namespace garlic { // new session ElGamalBlock elGamal; - i2p::crypto::ElGamalDecrypt ( + if (i2p::crypto::ElGamalDecrypt ( isFromTunnel ? i2p::context.GetLeaseSetPrivateKey () : i2p::context.GetPrivateKey (), - buf, (uint8_t *)&elGamal, true); - uint8_t iv[32]; // IV is first 16 bytes - CryptoPP::SHA256().CalculateDigest(iv, elGamal.preIV, 32); - m_Decryption.SetKeyWithIV (elGamal.sessionKey, 32, iv); - m_Decryption.ProcessData(buf + 514, buf + 514, length - 514); - HandleAESBlock (buf + 514, length - 514, elGamal.sessionKey); + buf, (uint8_t *)&elGamal, true)) + { + uint8_t iv[32]; // IV is first 16 bytes + CryptoPP::SHA256().CalculateDigest(iv, elGamal.preIV, 32); + m_Decryption.SetKeyWithIV (elGamal.sessionKey, 32, iv); + m_Decryption.ProcessData(buf + 514, buf + 514, length - 514); + HandleAESBlock (buf + 514, length - 514, elGamal.sessionKey); + } + else + LogPrint ("Failed to decrypt garlic"); } } @@ -268,12 +310,26 @@ namespace garlic m_SessionTags[std::string ((const char *)(buf + i*32), 32)] = std::string ((const char *)sessionKey, 32); buf += tagCount*32; uint32_t payloadSize = be32toh (*(uint32_t *)buf); + if (payloadSize > len) + { + LogPrint ("Unexpected payload size ", payloadSize); + return; + } buf += 4; - buf += 32;// payload hash. TODO: verify it + uint8_t * payloadHash = buf; + buf += 32;// payload hash. if (*buf) // session key? buf += 32; // new session key buf++; // flag + // payload + uint8_t hash[32]; + CryptoPP::SHA256().CalculateDigest(hash, buf, payloadSize); + if (memcmp (hash, payloadHash, 32)) // payload hash doesn't match + { + LogPrint ("Wrong payload hash"); + return; + } HandleGarlicPayload (buf, payloadSize); } @@ -317,10 +373,11 @@ namespace garlic case eGarlicDeliveryTypeTunnel: { LogPrint ("Garlic type tunnel"); - uint32_t gwTunnel = be32toh (*(uint32_t *)buf); - buf += 4; + // gwHash and gwTunnel sequence is reverted uint8_t * gwHash = buf; buf += 32; + uint32_t gwTunnel = be32toh (*(uint32_t *)buf); + buf += 4; auto tunnel = i2p::tunnel::tunnels.GetNextOutboundTunnel (); if (tunnel) // we have send it through an outbound tunnel { @@ -344,5 +401,17 @@ namespace garlic buf += 3; // Certificate } } + + void GarlicRouting::HandleDeliveryStatusMessage (uint8_t * buf, size_t len) + { + I2NPDeliveryStatusMsg * msg = (I2NPDeliveryStatusMsg *)buf; + auto it = m_CreatedSessions.find (be32toh (msg->msgID)); + if (it != m_CreatedSessions.end ()) + { + it->second->SetAcknowledged (true); + m_CreatedSessions.erase (it); + LogPrint ("Garlic message ", be32toh (msg->msgID), " acknowledged"); + } + } } } diff --git a/Garlic.h b/Garlic.h index 89f6bd90..c640eee3 100644 --- a/Garlic.h +++ b/Garlic.h @@ -40,8 +40,12 @@ namespace garlic GarlicRoutingSession (const i2p::data::RoutingDestination * destination, int numTags); ~GarlicRoutingSession (); I2NPMessage * WrapSingleMessage (I2NPMessage * msg, I2NPMessage * leaseSet); - int GetNumRemainingSessionTags () const { return m_NumTags - m_NextTag; }; + int GetNextTag () const { return m_NextTag; }; + uint32_t GetFirstMsgID () const { return m_FirstMsgID; }; + bool IsAcknowledged () const { return m_IsAcknowledged; }; + void SetAcknowledged (bool acknowledged) { m_IsAcknowledged = acknowledged; }; + private: size_t CreateAESBlock (uint8_t * buf, I2NPMessage * msg, I2NPMessage * leaseSet); @@ -49,10 +53,14 @@ namespace garlic size_t CreateGarlicClove (uint8_t * buf, I2NPMessage * msg, bool isDestination); size_t CreateDeliveryStatusClove (uint8_t * buf, uint32_t msgID); + void GenerateSessionTags (); + private: const i2p::data::RoutingDestination * m_Destination; uint8_t m_SessionKey[32]; + uint32_t m_FirstMsgID; // first message ID + bool m_IsAcknowledged; int m_NumTags, m_NextTag; uint8_t * m_SessionTags; // m_NumTags*32 bytes @@ -68,9 +76,11 @@ namespace garlic ~GarlicRouting (); void HandleGarlicMessage (uint8_t * buf, size_t len, bool isFromTunnel); + void HandleDeliveryStatusMessage (uint8_t * buf, size_t len); - I2NPMessage * WrapSingleMessage (const i2p::data::RoutingDestination * destination, - I2NPMessage * msg, I2NPMessage * leaseSet = nullptr); + I2NPMessage * WrapSingleMessage (const i2p::data::RoutingDestination * destination, I2NPMessage * msg); + I2NPMessage * WrapMessage (const i2p::data::RoutingDestination * destination, + I2NPMessage * msg, I2NPMessage * leaseSet = nullptr); private: @@ -81,6 +91,7 @@ namespace garlic // outgoing sessions std::map m_Sessions; + std::map m_CreatedSessions; // msgID -> session // incoming session std::map m_SessionTags; // tag -> key CryptoPP::CBC_Mode::Decryption m_Decryption; diff --git a/HTTPServer.cpp b/HTTPServer.cpp index 577ef748..5aeac9ed 100644 --- a/HTTPServer.cpp +++ b/HTTPServer.cpp @@ -1,8 +1,12 @@ #include #include +#include "base64.h" +#include "Log.h" #include "Tunnel.h" #include "TransitTunnel.h" #include "Transports.h" +#include "NetDb.h" +#include "Streaming.h" #include "HTTPServer.h" namespace i2p @@ -20,16 +24,19 @@ namespace util std::vector HTTPConnection::reply::to_buffers() { std::vector buffers; - buffers.push_back (boost::asio::buffer ("HTTP/1.0 200 OK\r\n")); // always OK - for (std::size_t i = 0; i < headers.size(); ++i) - { - header& h = headers[i]; - buffers.push_back(boost::asio::buffer(h.name)); - buffers.push_back(boost::asio::buffer(misc_strings::name_value_separator)); - buffers.push_back(boost::asio::buffer(h.value)); + if (headers.size () > 0) + { + buffers.push_back (boost::asio::buffer ("HTTP/1.0 200 OK\r\n")); // always OK + for (std::size_t i = 0; i < headers.size(); ++i) + { + header& h = headers[i]; + buffers.push_back(boost::asio::buffer(h.name)); + buffers.push_back(boost::asio::buffer(misc_strings::name_value_separator)); + buffers.push_back(boost::asio::buffer(h.value)); + buffers.push_back(boost::asio::buffer(misc_strings::crlf)); + } buffers.push_back(boost::asio::buffer(misc_strings::crlf)); - } - buffers.push_back(boost::asio::buffer(misc_strings::crlf)); + } buffers.push_back(boost::asio::buffer(content)); return buffers; } @@ -42,7 +49,7 @@ namespace util void HTTPConnection::Receive () { - m_Socket->async_read_some (boost::asio::buffer (m_Buffer), + m_Socket->async_read_some (boost::asio::buffer (m_Buffer, 8192), boost::bind(&HTTPConnection::HandleReceive, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } @@ -51,7 +58,13 @@ namespace util { if (!ecode) { - HandleRequest (); + m_Buffer[bytes_transferred] = 0; + auto address = ExtractAddress (); + LogPrint (address); + if (address.length () > 1) // not just '/' + HandleDestinationRequest (address.substr (1)); // exclude '/' + else + HandleRequest (); boost::asio::async_write (*m_Socket, m_Reply.to_buffers(), boost::bind (&HTTPConnection::HandleWrite, this, boost::asio::placeholders::error)); @@ -61,6 +74,18 @@ namespace util Terminate (); } + std::string HTTPConnection::ExtractAddress () + { + char * get = strstr (m_Buffer, "GET"); + if (get) + { + char * http = strstr (get, "HTTP"); + if (http) + return std::string (get + 4, http - get - 5); + } + return ""; + } + void HTTPConnection::HandleWrite (const boost::system::error_code& ecode) { Terminate (); @@ -121,14 +146,78 @@ namespace util s << "
"; } } + s << "

Flibusta

"; } + void HTTPConnection::HandleDestinationRequest (std::string b32) + { + uint8_t destination[32]; + i2p::data::Base32ToByteStream (b32.c_str (), b32.length (), destination, 32); + auto leaseSet = i2p::data::netdb.FindLeaseSet (destination); + if (!leaseSet || !leaseSet->HasNonExpiredLeases ()) + { + i2p::data::netdb.RequestDestination (i2p::data::IdentHash (destination), true); + std::this_thread::sleep_for (std::chrono::seconds(10)); // wait for 10 seconds + leaseSet = i2p::data::netdb.FindLeaseSet (destination); + if (!leaseSet || !leaseSet->HasNonExpiredLeases ()) // still no LeaseSet + { + m_Reply.content = leaseSet ? "Leases expired" : "LeaseSet not found"; + m_Reply.headers.resize(2); + m_Reply.headers[0].name = "Content-Length"; + m_Reply.headers[0].value = boost::lexical_cast(m_Reply.content.size()); + m_Reply.headers[1].name = "Content-Type"; + m_Reply.headers[1].value = "text/html"; + return; + } + } + // we found LeaseSet + if (leaseSet->HasExpiredLeases ()) + { + // we should re-request LeaseSet + LogPrint ("LeaseSet re-requested"); + i2p::data::netdb.RequestDestination (i2p::data::IdentHash (destination), true); + } + auto s = i2p::stream::CreateStream (leaseSet); + if (s) + { + std::string request = "GET / HTTP/1.1\n Host:" + b32 + ".b32.i2p\n"; + s->Send ((uint8_t *)request.c_str (), request.length (), 10); + std::stringstream ss; + uint8_t buf[8192]; + size_t r = s->Receive (buf, 8192, 30); // 30 seconds + if (!r && s->IsEstablished ()) // nothing received but connection is established + r = s->Receive (buf, 8192, 30); // wait for another 30 secondd + if (r) // we recieved data + { + ss << std::string ((char *)buf, r); + while (s->IsOpen () && (r = s->Receive (buf, 8192, 30)) > 0) + ss << std::string ((char *)buf,r); + + m_Reply.content = ss.str (); // send "as is" + m_Reply.headers.resize(0); // no headers + return; + } + else // nothing received + ss << "Not responding"; + s->Close (); + //DeleteStream (s); + + m_Reply.content = ss.str (); + m_Reply.headers.resize(2); + m_Reply.headers[0].name = "Content-Length"; + m_Reply.headers[0].value = boost::lexical_cast(m_Reply.content.size()); + m_Reply.headers[1].name = "Content-Type"; + m_Reply.headers[1].value = "text/html"; + } + } + HTTPServer::HTTPServer (int port): m_Thread (nullptr), m_Work (m_Service), m_Acceptor (m_Service, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), port)), m_NewSocket (nullptr) { + } HTTPServer::~HTTPServer () diff --git a/HTTPServer.h b/HTTPServer.h index 84439c2a..90aacfc0 100644 --- a/HTTPServer.h +++ b/HTTPServer.h @@ -48,12 +48,14 @@ namespace util void HandleWrite(const boost::system::error_code& ecode); void HandleRequest (); + void HandleDestinationRequest (std::string b32); void FillContent (std::stringstream& s); - + std::string ExtractAddress (); + private: boost::asio::ip::tcp::socket * m_Socket; - boost::array m_Buffer; + char m_Buffer[8192]; request m_Request; reply m_Reply; }; diff --git a/I2NPProtocol.cpp b/I2NPProtocol.cpp index 9395c40c..502a2e40 100644 --- a/I2NPProtocol.cpp +++ b/I2NPProtocol.cpp @@ -63,20 +63,13 @@ namespace i2p { I2NPMessage * msg = NewI2NPMessage (); memcpy (msg->GetBuffer (), buf, len); - msg->len += msg->offset + len; + msg->len = msg->offset + len; return msg; } I2NPMessage * CreateDeliveryStatusMsg (uint32_t msgID) { -#pragma pack(1) - struct - { - uint32_t msgID; - uint64_t timestamp; - } msg; -#pragma pack () - + I2NPDeliveryStatusMsg msg; msg.msgID = htobe32 (msgID); msg.timestamp = htobe64 (i2p::util::GetMillisecondsSinceEpoch ()); return CreateI2NPMessage (eI2NPDeliveryStatus, (uint8_t *)&msg, sizeof (msg)); @@ -178,6 +171,7 @@ namespace i2p CryptoPP::Gzip compressor; compressor.Put ((uint8_t *)context.GetRouterInfo ().GetBuffer (), context.GetRouterInfo ().GetBufferLen ()); compressor.MessageEnd(); + // WARNING!!! MaxRetrievable() return uint64_t. Есть подозрение, что что-то не так int size = compressor.MaxRetrievable (); uint8_t * buf = m->GetPayload () + sizeof (I2NPDatabaseStoreMsg); *(uint16_t *)buf = htobe16 (size); // size @@ -431,6 +425,8 @@ namespace i2p break; case eI2NPDeliveryStatus: LogPrint ("DeliveryStatus"); + // we assume DeliveryStatusMessage is sent with garlic only + i2p::garlic::routing.HandleDeliveryStatusMessage (buf, size); break; case eI2NPVariableTunnelBuild: LogPrint ("VariableTunnelBuild"); diff --git a/I2NPProtocol.h b/I2NPProtocol.h index ebba0c5e..7f47601e 100644 --- a/I2NPProtocol.h +++ b/I2NPProtocol.h @@ -26,6 +26,11 @@ namespace i2p uint32_t replyToken; }; + struct I2NPDeliveryStatusMsg + { + uint32_t msgID; + uint64_t timestamp; + }; struct I2NPBuildRequestRecordClearText { diff --git a/LeaseSet.cpp b/LeaseSet.cpp index c948c343..efa6409f 100644 --- a/LeaseSet.cpp +++ b/LeaseSet.cpp @@ -1,8 +1,9 @@ +#include "I2PEndian.h" #include #include "CryptoConst.h" #include "Log.h" +#include "Timestamp.h" #include "LeaseSet.h" -#include "I2PEndian.h" namespace i2p { @@ -32,6 +33,7 @@ namespace data { Lease lease = *(Lease *)leases; lease.tunnelID = be32toh (lease.tunnelID); + lease.endDate = be64toh (lease.endDate); m_Leases.push_back (lease); leases += sizeof (Lease); } @@ -44,5 +46,31 @@ namespace data if (!verifier.VerifyMessage (buf, leases - buf, leases, 40)) LogPrint ("LeaseSet verification failed"); } + + std::vector LeaseSet::GetNonExpiredLeases () const + { + auto ts = i2p::util::GetMillisecondsSinceEpoch (); + std::vector leases; + for (auto& it: m_Leases) + if (ts < it.endDate) + leases.push_back (it); + return leases; + } + + bool LeaseSet::HasExpiredLeases () const + { + auto ts = i2p::util::GetMillisecondsSinceEpoch (); + for (auto& it: m_Leases) + if (ts >= it.endDate) return true; + return false; + } + + bool LeaseSet::HasNonExpiredLeases () const + { + auto ts = i2p::util::GetMillisecondsSinceEpoch (); + for (auto& it: m_Leases) + if (ts < it.endDate) return true; + return false; + } } } diff --git a/LeaseSet.h b/LeaseSet.h index 5bb75741..294e32d0 100644 --- a/LeaseSet.h +++ b/LeaseSet.h @@ -27,11 +27,16 @@ namespace data public: LeaseSet (const uint8_t * buf, int len); - + LeaseSet (const LeaseSet& ) = default; + LeaseSet& operator=(const LeaseSet& ) = default; + // implements RoutingDestination const Identity& GetIdentity () const { return m_Identity; }; const IdentHash& GetIdentHash () const { return m_IdentHash; }; const std::vector& GetLeases () const { return m_Leases; }; + std::vector GetNonExpiredLeases () const; + bool HasExpiredLeases () const; + bool HasNonExpiredLeases () const; const uint8_t * GetEncryptionPublicKey () const { return m_EncryptionKey; }; bool IsDestination () const { return true; }; diff --git a/NetDb.cpp b/NetDb.cpp index 870027e7..9b454598 100644 --- a/NetDb.cpp +++ b/NetDb.cpp @@ -1,5 +1,6 @@ #include "I2PEndian.h" #include +#include #include #include #include "base64.h" @@ -119,10 +120,9 @@ namespace data if (r->GetTimestamp () > it->second->GetTimestamp ()) { LogPrint ("RouterInfo updated"); - *m_RouterInfos[r->GetIdentHash ()] = *r; // we can't replace point because it's used by tunnels - } - else - delete r; + *(it->second) = *r; // we can't replace pointer because it's used by tunnels + } + delete r; } else { @@ -135,7 +135,18 @@ namespace data { LeaseSet * l = new LeaseSet (buf, len); DeleteRequestedDestination (l->GetIdentHash ()); - m_LeaseSets[l->GetIdentHash ()] = l; + auto it = m_LeaseSets.find(l->GetIdentHash ()); + if (it != m_LeaseSets.end ()) + { + LogPrint ("LeaseSet updated"); + *(it->second) = *l; // we can't replace pointer because it's used by streams + delete l; + } + else + { + LogPrint ("New LeaseSet added"); + m_LeaseSets[l->GetIdentHash ()] = l; + } } RouterInfo * NetDb::FindRouter (const IdentHash& ident) const @@ -303,7 +314,8 @@ namespace data { i2p::tunnel::OutboundTunnel * outbound = dest->GetLastOutboundTunnel (); const i2p::tunnel::InboundTunnel * inbound = dest->GetLastReplyTunnel (); - + std::vector msgs; + for (int i = 0; i < num; i++) { uint8_t * router = buf + 33 + i*32; @@ -322,7 +334,11 @@ namespace data RequestedDestination * d1 = CreateRequestedDestination (router, false, false); d1->SetLastOutboundTunnel (outbound); auto msg = d1->CreateRequestMessage (dest->GetLastRouter (), dest->GetLastReplyTunnel ()); - outbound->GetTunnelGateway ().PutTunnelDataMsg (dest->GetLastRouter ()->GetIdentHash (), 0, msg); + msgs.push_back (i2p::tunnel::TunnelMessageBlock + { + i2p::tunnel::eDeliveryTypeRouter, + dest->GetLastRouter ()->GetIdentHash (), 0, msg + }); } } else @@ -337,11 +353,15 @@ namespace data // do we have that floodfill router in our database? if (r) { - if (!dest->IsExcluded (r->GetIdentHash ()) && dest->GetNumExcludedPeers () < 10) // TODO: fix TunnelGateway first + if (!dest->IsExcluded (r->GetIdentHash ()) && dest->GetNumExcludedPeers () < 30) // TODO: fix TunnelGateway first { // request destination auto msg = dest->CreateRequestMessage (r, dest->GetLastReplyTunnel ()); - outbound->GetTunnelGateway ().PutTunnelDataMsg (r->GetIdentHash (), 0, msg); + msgs.push_back (i2p::tunnel::TunnelMessageBlock + { + i2p::tunnel::eDeliveryTypeRouter, + r->GetIdentHash (), 0, msg + }); } } else @@ -351,15 +371,18 @@ namespace data RequestedDestination * d2 = CreateRequestedDestination (router, false, false); d2->SetLastOutboundTunnel (outbound); I2NPMessage * msg = d2->CreateRequestMessage (dest->GetLastRouter (), inbound); - outbound->GetTunnelGateway ().PutTunnelDataMsg ( - dest->GetLastRouter ()->GetIdentHash (), 0, msg); + msgs.push_back (i2p::tunnel::TunnelMessageBlock + { + i2p::tunnel::eDeliveryTypeRouter, + dest->GetLastRouter ()->GetIdentHash (), 0, msg + }); } } } } - if (outbound) - outbound->GetTunnelGateway ().SendBuffer (); + if (msgs.size () > 0) + outbound->SendTunnelDataMsg (msgs); } else { @@ -388,12 +411,21 @@ namespace data rnd.GenerateBlock (randomHash, 32); RequestedDestination * dest = CreateRequestedDestination (IdentHash (randomHash), false, true); dest->SetLastOutboundTunnel (outbound); - - outbound->GetTunnelGateway ().PutTunnelDataMsg (floodfill->GetIdentHash (), 0, - CreateDatabaseStoreMsg ()); // tell floodfill about us - outbound->GetTunnelGateway ().PutTunnelDataMsg (floodfill->GetIdentHash (), 0, - dest->CreateRequestMessage (floodfill, inbound)); // explore - outbound->GetTunnelGateway ().SendBuffer (); + + std::vector msgs; + msgs.push_back (i2p::tunnel::TunnelMessageBlock + { + i2p::tunnel::eDeliveryTypeRouter, + floodfill->GetIdentHash (), 0, + CreateDatabaseStoreMsg () // tell floodfill about us + }); + msgs.push_back (i2p::tunnel::TunnelMessageBlock + { + i2p::tunnel::eDeliveryTypeRouter, + floodfill->GetIdentHash (), 0, + dest->CreateRequestMessage (floodfill, inbound) // explore + }); + outbound->SendTunnelDataMsg (msgs); } } } diff --git a/Queue.h b/Queue.h index 31e8e0a4..ae60f099 100644 --- a/Queue.h +++ b/Queue.h @@ -45,6 +45,18 @@ namespace util } return el; } + + bool Wait (int sec, int usec) + { + std::unique_lock l(m_QueueMutex); + return m_NonEmpty.wait_for (l, std::chrono::seconds (sec) + std::chrono::milliseconds (usec)) != std::cv_status::timeout; + } + + bool IsEmpty () + { + std::unique_lock l(m_QueueMutex); + return m_Queue.empty (); + } void WakeUp () { m_NonEmpty.notify_one (); }; @@ -54,14 +66,21 @@ namespace util return GetNonThreadSafe (); } + Element * Peek () + { + std::unique_lock l(m_QueueMutex); + return GetNonThreadSafe (true); + } + private: - Element * GetNonThreadSafe () + Element * GetNonThreadSafe (bool peek = false) { if (!m_Queue.empty ()) { Element * el = m_Queue.front (); - m_Queue.pop (); + if (!peek) + m_Queue.pop (); return el; } return nullptr; diff --git a/Streaming.cpp b/Streaming.cpp index fc746ebc..99d51618 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -1,5 +1,6 @@ #include "I2PEndian.h" #include +#include #include #include "Log.h" #include "RouterInfo.h" @@ -15,18 +16,26 @@ namespace i2p namespace stream { Stream::Stream (StreamingDestination * local, const i2p::data::LeaseSet * remote): - m_SendStreamID (0), m_SequenceNumber (0), m_LocalDestination (local), m_RemoteLeaseSet (remote) + m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (0), m_IsOpen (false), + m_LocalDestination (local), m_RemoteLeaseSet (remote), m_OutboundTunnel (nullptr) { m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); } - void Stream::HandleNextPacket (const uint8_t * buf, size_t len) + Stream::~Stream () { - const uint8_t * end = buf + len; + while (auto packet = m_ReceiveQueue.Get ()) + delete packet; + } + + void Stream::HandleNextPacket (Packet * packet) + { + const uint8_t * buf = packet->buf; buf += 4; // sendStreamID if (!m_SendStreamID) m_SendStreamID = be32toh (*(uint32_t *)buf); - buf += 4; // receiveStreamID + buf += 4; // receiveStreamID + uint32_t receivedSeqn = be32toh (*(uint32_t *)buf); buf += 4; // sequenceNum buf += 4; // ackThrough int nackCount = buf[0]; @@ -45,7 +54,7 @@ namespace stream { LogPrint ("Synchronize"); } - + if (flags & PACKET_FLAG_SIGNATURE_INCLUDED) { LogPrint ("Signature"); @@ -57,14 +66,61 @@ namespace stream LogPrint ("From identity"); optionalData += sizeof (i2p::data::Identity); } - + // we have reached payload section - std::string str((const char *)buf, end-buf); - LogPrint ("Payload: ", str); + LogPrint ("seqn=", receivedSeqn, ", flags=", flags); + if (!receivedSeqn || receivedSeqn == m_LastReceivedSequenceNumber + 1) + { + // we have received next message + packet->offset = buf - packet->buf; + if (packet->GetLength () > 0) + m_ReceiveQueue.Put (packet); + else + delete packet; + + m_LastReceivedSequenceNumber = receivedSeqn; + SendQuickAck (); + } + else + { + if (receivedSeqn <= m_LastReceivedSequenceNumber) + { + // we have received duplicate. Most likely our outbound tunnel is dead + LogPrint ("Duplicate message ", receivedSeqn, " received"); + m_OutboundTunnel = i2p::tunnel::tunnels.GetNextOutboundTunnel (); // pick another tunnel + if (m_OutboundTunnel) + SendQuickAck (); // resend ack for previous message again + } + else + { + LogPrint ("Missing messages from ", m_LastReceivedSequenceNumber + 1, " to ", receivedSeqn - 1); + // actually do nothing. just wait for missing message again + } + delete packet; // packet dropped + } + + if (flags & PACKET_FLAG_CLOSE) + { + LogPrint ("Closed"); + m_IsOpen = false; + m_ReceiveQueue.WakeUp (); + } } size_t Stream::Send (uint8_t * buf, size_t len, int timeout) { + if (!m_IsOpen) + ConnectAndSend (buf, len); + else + { + // TODO: implement + } + return len; + } + + void Stream::ConnectAndSend (uint8_t * buf, size_t len) + { + m_IsOpen = true; uint8_t packet[STREAMING_MTU]; size_t size = 0; *(uint32_t *)(packet + size) = htobe32 (m_SendStreamID); @@ -80,30 +136,149 @@ namespace stream size++; // resend delay // TODO: for initial packet only, following packets have different falgs *(uint16_t *)(packet + size) = htobe16 (PACKET_FLAG_SYNCHRONIZE | - PACKET_FLAG_FROM_INCLUDED | PACKET_FLAG_SIGNATURE_INCLUDED | PACKET_FLAG_NO_ACK); + PACKET_FLAG_FROM_INCLUDED | PACKET_FLAG_SIGNATURE_INCLUDED | + PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED | PACKET_FLAG_NO_ACK); size += 2; // flags - *(uint16_t *)(packet + size) = htobe16 (sizeof (i2p::data::Identity) + 40); // identity + signature + *(uint16_t *)(packet + size) = htobe16 (sizeof (i2p::data::Identity) + 40 + 2); // identity + signature + packet size size += 2; // options size memcpy (packet + size, &m_LocalDestination->GetIdentity (), sizeof (i2p::data::Identity)); size += sizeof (i2p::data::Identity); // from + *(uint16_t *)(packet + size) = htobe16 (STREAMING_MTU); + size += 2; // max packet size uint8_t * signature = packet + size; // set it later memset (signature, 0, 40); // zeroes for now size += 40; // signature + memcpy (packet + size, buf, len); size += len; // payload m_LocalDestination->Sign (packet, size, signature); - I2NPMessage * msg = i2p::garlic::routing.WrapSingleMessage (m_RemoteLeaseSet, + I2NPMessage * msg = i2p::garlic::routing.WrapMessage (m_RemoteLeaseSet, CreateDataMessage (this, packet, size), m_LocalDestination->GetLeaseSet ()); - auto outbound = i2p::tunnel::tunnels.GetNextOutboundTunnel (); - if (outbound) + if (!m_OutboundTunnel) + m_OutboundTunnel = i2p::tunnel::tunnels.GetNextOutboundTunnel (); + if (m_OutboundTunnel) { auto& lease = m_RemoteLeaseSet->GetLeases ()[0]; // TODO: - outbound->SendTunnelDataMsg (lease.tunnelGateway, lease.tunnelID, msg); + m_OutboundTunnel->SendTunnelDataMsg (lease.tunnelGateway, lease.tunnelID, msg); } else DeleteI2NPMessage (msg); - return len; + } + + void Stream::SendQuickAck () + { + uint8_t packet[STREAMING_MTU]; + size_t size = 0; + *(uint32_t *)(packet + size) = htobe32 (m_SendStreamID); + size += 4; // sendStreamID + *(uint32_t *)(packet + size) = htobe32 (m_RecvStreamID); + size += 4; // receiveStreamID + *(uint32_t *)(packet + size) = 0; // this is plain Ack message + size += 4; // sequenceNum + *(uint32_t *)(packet + size) = htobe32 (m_LastReceivedSequenceNumber); + size += 4; // ack Through + packet[size] = 0; + size++; // NACK count + size++; // resend delay + *(uint16_t *)(packet + size) = 0; // nof flags set + size += 2; // flags + *(uint16_t *)(packet + size) = 0; // no options + size += 2; // options size + + I2NPMessage * msg = i2p::garlic::routing.WrapMessage (m_RemoteLeaseSet, + CreateDataMessage (this, packet, size)); + if (m_OutboundTunnel) + { + auto leases = m_RemoteLeaseSet->GetNonExpiredLeases (); + if (!leases.empty ()) + { + auto& lease = leases[0]; // TODO: + m_OutboundTunnel->SendTunnelDataMsg (lease.tunnelGateway, lease.tunnelID, msg); + LogPrint ("Quick Ack sent"); + } + else + { + LogPrint ("All leases are expired"); + DeleteI2NPMessage (msg); + } + } + else + DeleteI2NPMessage (msg); + } + + void Stream::Close () + { + if (m_IsOpen) + { + m_IsOpen = false; + uint8_t packet[STREAMING_MTU]; + size_t size = 0; + *(uint32_t *)(packet + size) = htobe32 (m_SendStreamID); + size += 4; // sendStreamID + *(uint32_t *)(packet + size) = htobe32 (m_RecvStreamID); + size += 4; // receiveStreamID + *(uint32_t *)(packet + size) = htobe32 (m_SequenceNumber); + size += 4; // sequenceNum + *(uint32_t *)(packet + size) = htobe32 (m_LastReceivedSequenceNumber); + size += 4; // ack Through + packet[size] = 0; + size++; // NACK count + size++; // resend delay + *(uint16_t *)(packet + size) = PACKET_FLAG_CLOSE | PACKET_FLAG_SIGNATURE_INCLUDED; + size += 2; // flags + *(uint16_t *)(packet + size) = htobe16 (40); // 40 bytes signature + size += 2; // options size + uint8_t * signature = packet + size; + memset (packet + size, 0, 40); + size += 40; // signature + m_LocalDestination->Sign (packet, size, signature); + + I2NPMessage * msg = i2p::garlic::routing.WrapSingleMessage (m_RemoteLeaseSet, + CreateDataMessage (this, packet, size)); + if (m_OutboundTunnel) + { + auto& lease = m_RemoteLeaseSet->GetLeases ()[0]; // TODO: + m_OutboundTunnel->SendTunnelDataMsg (lease.tunnelGateway, lease.tunnelID, msg); + LogPrint ("FIN sent"); + } + else + DeleteI2NPMessage (msg); + m_ReceiveQueue.WakeUp (); + } + } + + size_t Stream::Receive (uint8_t * buf, size_t len, int timeout) + { + if (!m_IsOpen) return 0; + if (m_ReceiveQueue.IsEmpty ()) + { + if (!timeout) return 0; + if (!m_ReceiveQueue.Wait (timeout, 0)) + return 0; + } + + // either non-empty or we have received empty + size_t pos = 0; + while (pos < len) + { + Packet * packet = m_ReceiveQueue.Peek (); + if (packet) + { + size_t l = std::min (packet->GetLength (), len - pos); + memcpy (buf + pos, packet->GetBuffer (), l); + pos += l; + packet->offset += l; + if (!packet->GetLength ()) + { + m_ReceiveQueue.Get (); + delete packet; + } + } + else // no more data available + break; + } + return pos; } StreamingDestination * sharedLocalDestination = nullptr; @@ -124,14 +299,17 @@ namespace stream DeleteI2NPMessage (m_LeaseSet); } - void StreamingDestination::HandleNextPacket (const uint8_t * buf, size_t len) + void StreamingDestination::HandleNextPacket (Packet * packet) { - uint32_t sendStreamID = be32toh (*(uint32_t *)(buf)); + uint32_t sendStreamID = be32toh (*(uint32_t *)(packet->buf)); auto it = m_Streams.find (sendStreamID); if (it != m_Streams.end ()) - it->second->HandleNextPacket (buf, len); + it->second->HandleNextPacket (packet); else + { LogPrint ("Unknown stream ", sendStreamID); + delete packet; + } } Stream * StreamingDestination::CreateNewStream (const i2p::data::LeaseSet * remote) @@ -175,11 +353,12 @@ namespace stream size += 256; // encryption key memset (buf + size, 0, 128); size += 128; // signing key - auto tunnel = i2p::tunnel::tunnels.GetNextInboundTunnel (); - if (tunnel) + auto tunnels = i2p::tunnel::tunnels.GetInboundTunnels (5); // 5 tunnels maximum + buf[size] = tunnels.size (); // num leases + size++; // num + for (auto it: tunnels) { - buf[size] = 1; // 1 lease - size++; // num + auto tunnel = it; memcpy (buf + size, (const uint8_t *)tunnel->GetNextIdentHash (), 32); size += 32; // tunnel_gw *(uint32_t *)(buf + size) = htobe32 (tunnel->GetNextTunnelID ()); @@ -189,11 +368,6 @@ namespace stream *(uint64_t *)(buf + size) = htobe64 (ts); size += 8; // end_date } - else - { - buf[size] = 0; // zero leases - size++; // num - } Sign (buf, size, buf+ size); size += 40; // signature @@ -215,7 +389,7 @@ namespace stream return sharedLocalDestination->CreateNewStream (remote); } - void CloseStream (Stream * stream) + void DeleteStream (Stream * stream) { if (sharedLocalDestination) sharedLocalDestination->DeleteStream (stream); @@ -232,13 +406,19 @@ namespace stream CryptoPP::Gunzip decompressor; decompressor.Put (buf, length); decompressor.MessageEnd(); - uint8_t uncompressed[2048]; - int uncompressedSize = decompressor.MaxRetrievable (); - decompressor.Get (uncompressed, uncompressedSize); + Packet * uncompressed = new Packet; + uncompressed->offset = 0; + uncompressed->len = decompressor.MaxRetrievable (); + if (uncompressed->len > MAX_PACKET_SIZE) + { + LogPrint ("Recieved packet size exceeds mac packer size"); + uncompressed->len = MAX_PACKET_SIZE; + } + decompressor.Get (uncompressed->buf, uncompressed->len); // then forward to streaming engine // TODO: we have onle one destination, might be more if (sharedLocalDestination) - sharedLocalDestination->HandleNextPacket (uncompressed, uncompressedSize); + sharedLocalDestination->HandleNextPacket (uncompressed); } else LogPrint ("Data: protocol ", buf[9], " is not supported"); @@ -248,6 +428,7 @@ namespace stream { I2NPMessage * msg = NewI2NPMessage (); CryptoPP::Gzip compressor; + compressor.SetDeflateLevel (CryptoPP::Gzip::MIN_DEFLATE_LEVEL); compressor.Put (payload, len); compressor.MessageEnd(); int size = compressor.MaxRetrievable (); @@ -255,6 +436,7 @@ namespace stream *(uint32_t *)buf = htobe32 (size); // length buf += 4; compressor.Get (buf, size); + memset (buf + 4, 0, 4); // source and destination ports. TODO: fill with proper values later buf[9] = 6; // streaming protocol msg->len += size + 4; FillI2NPMessageHeader (msg, eI2NPData); diff --git a/Streaming.h b/Streaming.h index 26e84e3a..cc772190 100644 --- a/Streaming.h +++ b/Streaming.h @@ -4,9 +4,11 @@ #include #include #include +#include "Queue.h" #include "Identity.h" #include "LeaseSet.h" #include "I2NPProtocol.h" +#include "Tunnel.h" namespace i2p { @@ -24,27 +26,50 @@ namespace stream const uint16_t PACKET_FLAG_ECHO = 0x0200; const uint16_t PACKET_FLAG_NO_ACK = 0x0400; - const size_t STREAMING_MTU = 1730; - + const size_t STREAMING_MTU = 1730; + const size_t MAX_PACKET_SIZE = 1754; + + struct Packet + { + uint8_t buf[1754]; + size_t len, offset; + + Packet (): len (0), offset (0) {}; + uint8_t * GetBuffer () { return buf + offset; }; + size_t GetLength () const { return len - offset; }; + }; + class StreamingDestination; class Stream { public: Stream (StreamingDestination * local, const i2p::data::LeaseSet * remote); + ~Stream (); uint32_t GetSendStreamID () const { return m_SendStreamID; }; uint32_t GetRecvStreamID () const { return m_RecvStreamID; }; const i2p::data::LeaseSet * GetRemoteLeaseSet () const { return m_RemoteLeaseSet; }; + bool IsOpen () const { return m_IsOpen; }; bool IsEstablished () const { return m_SendStreamID; }; - void HandleNextPacket (const uint8_t * buf, size_t len); + void HandleNextPacket (Packet * packet); size_t Send (uint8_t * buf, size_t len, int timeout); // timeout in seconds + size_t Receive (uint8_t * buf, size_t len, int timeout = 0); // returns 0 if timeout expired + void Close (); + + private: + + void ConnectAndSend (uint8_t * buf, size_t len); + void SendQuickAck (); private: - uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber; + uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber, m_LastReceivedSequenceNumber; + bool m_IsOpen; StreamingDestination * m_LocalDestination; const i2p::data::LeaseSet * m_RemoteLeaseSet; + i2p::util::Queue m_ReceiveQueue; + i2p::tunnel::OutboundTunnel * m_OutboundTunnel; }; class StreamingDestination @@ -61,7 +86,7 @@ namespace stream Stream * CreateNewStream (const i2p::data::LeaseSet * remote); void DeleteStream (Stream * stream); - void HandleNextPacket (const uint8_t * buf, size_t len); + void HandleNextPacket (Packet * packet); private: @@ -80,7 +105,7 @@ namespace stream }; Stream * CreateStream (const i2p::data::LeaseSet * remote); - void CloseStream (Stream * stream); + void DeleteStream (Stream * stream); // assuming data is I2CP message void HandleDataMessage (i2p::data::IdentHash * destination, const uint8_t * buf, size_t len); diff --git a/Transports.cpp b/Transports.cpp index e9953eca..371ea2e6 100644 --- a/Transports.cpp +++ b/Transports.cpp @@ -142,7 +142,14 @@ namespace i2p session = new i2p::ntcp::NTCPClient (m_Service, address->host.c_str (), address->port, *r); AddNTCPSession (session); } + else + LogPrint ("No NTCP addresses available"); } + else + { + LogPrint ("Router not found. Requested"); + i2p::data::netdb.RequestDestination (ident); + } } if (session) session->SendI2NPMessage (msg); diff --git a/Tunnel.cpp b/Tunnel.cpp index 2a87a939..f92337f7 100644 --- a/Tunnel.cpp +++ b/Tunnel.cpp @@ -138,13 +138,29 @@ namespace tunnel { m_Gateway.SendTunnelDataMsg (gwHash, gwTunnel, msg); } - - void OutboundTunnel::SendTunnelDataMsg (i2p::I2NPMessage * msg) + + void OutboundTunnel::SendTunnelDataMsg (std::vector msgs) { - SendTunnelDataMsg (nullptr, 0, msg); + for (auto& it : msgs) + { + switch (it.deliveryType) + { + case eDeliveryTypeLocal: + m_Gateway.SendTunnelDataMsg (nullptr, 0, it.data); + break; + case eDeliveryTypeTunnel: + m_Gateway.SendTunnelDataMsg (it.hash, it.tunnelID, it.data); + break; + case eDeliveryTypeRouter: + m_Gateway.SendTunnelDataMsg (it.hash, 0, it.data); + break; + default: + LogPrint ("Unexpected delivery type ", (int)it.deliveryType); + } + } + m_Gateway.SendBuffer (); } - - + Tunnels tunnels; Tunnels::Tunnels (): m_IsRunning (false), m_IsTunnelCreated (false), @@ -211,6 +227,23 @@ namespace tunnel } return tunnel; } + + std::vector Tunnels::GetInboundTunnels (int num) const + { + std::vector v; + int i = 0; + for (auto it : m_InboundTunnels) + { + if (i >= num) break; + if (it.second->GetNextIdentHash () != i2p::context.GetRouterInfo ().GetIdentHash ()) + { + // exclude one hop tunnels + v.push_back (it.second); + i++; + } + } + return v; + } OutboundTunnel * Tunnels::GetNextOutboundTunnel () { diff --git a/Tunnel.h b/Tunnel.h index cec82958..a189da0d 100644 --- a/Tunnel.h +++ b/Tunnel.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -64,10 +65,9 @@ namespace tunnel OutboundTunnel (TunnelConfig * config): Tunnel (config), m_Gateway (this) {}; - void SendTunnelDataMsg (i2p::I2NPMessage * msg); //local void SendTunnelDataMsg (const uint8_t * gwHash, uint32_t gwTunnel, i2p::I2NPMessage * msg); - - TunnelGateway& GetTunnelGateway () { return m_Gateway; }; + void SendTunnelDataMsg (std::vector msgs); // multiple messages + size_t GetNumSentBytes () const { return m_Gateway.GetNumSentBytes (); }; // implements TunnelBase @@ -107,6 +107,7 @@ namespace tunnel InboundTunnel * GetInboundTunnel (uint32_t tunnelID); Tunnel * GetPendingTunnel (uint32_t replyMsgID); InboundTunnel * GetNextInboundTunnel (); + std::vector GetInboundTunnels (int num) const; OutboundTunnel * GetNextOutboundTunnel (); TransitTunnel * GetTransitTunnel (uint32_t tunnelID); void AddTransitTunnel (TransitTunnel * tunnel); diff --git a/TunnelBase.h b/TunnelBase.h index bd760989..6be902ca 100644 --- a/TunnelBase.h +++ b/TunnelBase.h @@ -4,6 +4,7 @@ #include #include "Timestamp.h" #include "I2NPProtocol.h" +#include "Identity.h" namespace i2p { @@ -22,8 +23,8 @@ namespace tunnel struct TunnelMessageBlock { TunnelDeliveryType deliveryType; + i2p::data::IdentHash hash; uint32_t tunnelID; - uint8_t hash[32]; I2NPMessage * data; }; @@ -31,6 +32,7 @@ namespace tunnel { public: + //WARNING!!! GetSecondsSinceEpoch() return uint64_t TunnelBase (): m_CreationTime (i2p::util::GetSecondsSinceEpoch ()) {}; virtual ~TunnelBase () {}; diff --git a/TunnelEndpoint.cpp b/TunnelEndpoint.cpp index c5ec73b4..0e23a837 100644 --- a/TunnelEndpoint.cpp +++ b/TunnelEndpoint.cpp @@ -41,12 +41,12 @@ namespace tunnel LogPrint ("Delivery type tunnel"); m.tunnelID = be32toh (*(uint32_t *)fragment); fragment += 4; // tunnelID - memcpy (m.hash, fragment, 32); + m.hash = i2p::data::IdentHash (fragment); fragment += 32; // hash break; case eDeliveryTypeRouter: // 2 LogPrint ("Delivery type router"); - memcpy (m.hash, fragment, 32); + m.hash = i2p::data::IdentHash (fragment); fragment += 32; // to hash break; default: @@ -143,7 +143,7 @@ namespace tunnel void TunnelEndpoint::HandleNextMessage (const TunnelMessageBlock& msg) { - LogPrint ("TunnelMessage: handle fragment of ", msg.data->GetLength ()," bytes"); + LogPrint ("TunnelMessage: handle fragment of ", msg.data->GetLength ()," bytes. Msg type ", (int)msg.data->GetHeader()->typeID); switch (msg.deliveryType) { case eDeliveryTypeLocal: diff --git a/i2p.cpp b/i2p.cpp index be007d15..e2b44dfe 100644 --- a/i2p.cpp +++ b/i2p.cpp @@ -1,6 +1,6 @@ #include +#include #include -#include #include "Log.h" #include "base64.h" #include "Transports.h" @@ -20,7 +20,7 @@ int main( int, char** ) i2p::transports.Start (); i2p::tunnel::tunnels.Start (); - boost::this_thread::sleep(boost::posix_time::seconds(1000)); + std::this_thread::sleep_for (std::chrono::seconds(10000)); i2p::tunnel::tunnels.Stop (); i2p::transports.Stop (); i2p::data::netdb.Stop ();