From 31295a663bab954e90e992dfa5e62c4ab9ab1366 Mon Sep 17 00:00:00 2001 From: orignal Date: Fri, 18 Apr 2014 19:27:39 -0400 Subject: [PATCH 1/3] common Send method --- Streaming.cpp | 66 +++++++++++++++++++++++++++------------------------ Streaming.h | 3 +-- 2 files changed, 36 insertions(+), 33 deletions(-) diff --git a/Streaming.cpp b/Streaming.cpp index 1baffd46..9a23c313 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -138,20 +138,8 @@ namespace stream } } - size_t Stream::Send (uint8_t * buf, size_t len, int timeout) + size_t Stream::Send (const uint8_t * buf, size_t len, int timeout) { - if (!m_IsOpen) - ConnectAndSend (buf, len); - else - { - // TODO: implement - } - return len; - } - - void Stream::ConnectAndSend (uint8_t * buf, size_t len) - { - m_IsOpen = true; Packet * p = new Packet (); uint8_t * packet = p->GetBuffer (); // TODO: implement setters @@ -167,27 +155,43 @@ namespace stream packet[size] = 0; size++; // NACK count size++; // resend delay - // TODO: for initial packet only, following packets have different falgs - *(uint16_t *)(packet + size) = htobe16 (PACKET_FLAG_SYNCHRONIZE | - PACKET_FLAG_FROM_INCLUDED | PACKET_FLAG_SIGNATURE_INCLUDED | - PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED | PACKET_FLAG_NO_ACK); - size += 2; // flags - *(uint16_t *)(packet + size) = htobe16 (sizeof (i2p::data::Identity) + 40 + 2); // identity + signature + packet size - size += 2; // options size - memcpy (packet + size, &m_LocalDestination->GetIdentity (), sizeof (i2p::data::Identity)); - size += sizeof (i2p::data::Identity); // from - *(uint16_t *)(packet + size) = htobe16 (STREAMING_MTU); - size += 2; // max packet size - uint8_t * signature = packet + size; // set it later - memset (signature, 0, 40); // zeroes for now - size += 40; // signature - memcpy (packet + size, buf, len); - size += len; // payload - m_LocalDestination->Sign (packet, size, signature); + if (!m_IsOpen) + { + // initial packet + m_IsOpen = true; + *(uint16_t *)(packet + size) = htobe16 (PACKET_FLAG_SYNCHRONIZE | + PACKET_FLAG_FROM_INCLUDED | PACKET_FLAG_SIGNATURE_INCLUDED | + PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED | PACKET_FLAG_NO_ACK); + size += 2; // flags + *(uint16_t *)(packet + size) = htobe16 (sizeof (i2p::data::Identity) + 40 + 2); // identity + signature + packet size + size += 2; // options size + memcpy (packet + size, &m_LocalDestination->GetIdentity (), sizeof (i2p::data::Identity)); + size += sizeof (i2p::data::Identity); // from + *(uint16_t *)(packet + size) = htobe16 (STREAMING_MTU); + size += 2; // max packet size + uint8_t * signature = packet + size; // set it later + memset (signature, 0, 40); // zeroes for now + size += 40; // signature + memcpy (packet + size, buf, len); + size += len; // payload + m_LocalDestination->Sign (packet, size, signature); + } + else + { + // follow on packet + *(uint16_t *)(packet + size) = 0; + size += 2; // flags + *(uint16_t *)(packet + size) = 0; // no options + size += 2; // options size + memcpy (packet + size, buf, len); + size += len; // payload + } p->len = size; - m_Service.post (boost::bind (&Stream::SendPacket, this, p)); + + return len; } + void Stream::SendQuickAck () { diff --git a/Streaming.h b/Streaming.h index 3a0e5114..88fd7c20 100644 --- a/Streaming.h +++ b/Streaming.h @@ -79,7 +79,7 @@ namespace stream bool IsEstablished () const { return m_SendStreamID; }; void HandleNextPacket (Packet * packet); - size_t Send (uint8_t * buf, size_t len, int timeout); // timeout in seconds + size_t Send (const uint8_t * buf, size_t len, int timeout); // timeout in seconds template void AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout = 0); @@ -90,7 +90,6 @@ namespace stream private: - void ConnectAndSend (uint8_t * buf, size_t len); void SendQuickAck (); bool SendPacket (Packet * packet); bool SendPacket (const uint8_t * buf, size_t len); From bf2e833f261bedb639b2f26e3beaededc973600c Mon Sep 17 00:00:00 2001 From: orignal Date: Sat, 19 Apr 2014 20:45:41 -0400 Subject: [PATCH 2/3] separate thread for SSU server --- SSU.cpp | 31 ++++++++++++++++++++++++++++--- SSU.h | 10 ++++++++-- Transports.cpp | 2 +- 3 files changed, 37 insertions(+), 6 deletions(-) diff --git a/SSU.cpp b/SSU.cpp index 761d0e6b..2f2baf30 100644 --- a/SSU.cpp +++ b/SSU.cpp @@ -992,8 +992,8 @@ namespace ssu m_Server.Send (buf, msgSize, m_RemoteEndpoint); } - SSUServer::SSUServer (boost::asio::io_service& service, int port): - m_Endpoint (boost::asio::ip::udp::v4 (), port), m_Socket (service, m_Endpoint) + SSUServer::SSUServer (int port): m_Thread (nullptr), m_Work (m_Service), + m_Endpoint (boost::asio::ip::udp::v4 (), port), m_Socket (m_Service, m_Endpoint) { m_Socket.set_option (boost::asio::socket_base::receive_buffer_size (65535)); m_Socket.set_option (boost::asio::socket_base::send_buffer_size (65535)); @@ -1007,15 +1007,40 @@ namespace ssu void SSUServer::Start () { - Receive (); + m_IsRunning = true; + m_Thread = new std::thread (std::bind (&SSUServer::Run, this)); + m_Service.post (boost::bind (&SSUServer::Receive, this)); } void SSUServer::Stop () { DeleteAllSessions (); + m_IsRunning = false; + m_Service.stop (); m_Socket.close (); + if (m_Thread) + { + m_Thread->join (); + delete m_Thread; + m_Thread = 0; + } } + void SSUServer::Run () + { + while (m_IsRunning) + { + try + { + m_Service.run (); + } + catch (std::exception& ex) + { + LogPrint ("SSU server: ", ex.what ()); + } + } + } + void SSUServer::AddRelay (uint32_t tag, const boost::asio::ip::udp::endpoint& relay) { m_Relays[tag] = relay; diff --git a/SSU.h b/SSU.h index 95198d21..2d874335 100644 --- a/SSU.h +++ b/SSU.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -154,7 +155,7 @@ namespace ssu { public: - SSUServer (boost::asio::io_service& service, int port); + SSUServer (int port); ~SSUServer (); void Start (); void Stop (); @@ -172,11 +173,16 @@ namespace ssu private: + void Run (); void Receive (); void HandleReceivedFrom (const boost::system::error_code& ecode, std::size_t bytes_transferred); private: - + + bool m_IsRunning; + std::thread * m_Thread; + boost::asio::io_service m_Service; + boost::asio::io_service::work m_Work; boost::asio::ip::udp::endpoint m_Endpoint; boost::asio::ip::udp::socket m_Socket; boost::asio::ip::udp::endpoint m_SenderEndpoint; diff --git a/Transports.cpp b/Transports.cpp index d7bdd634..249e5f0e 100644 --- a/Transports.cpp +++ b/Transports.cpp @@ -110,7 +110,7 @@ namespace i2p { if (!m_SSUServer) { - m_SSUServer = new i2p::ssu::SSUServer (m_Service, address.port); + m_SSUServer = new i2p::ssu::SSUServer (address.port); LogPrint ("Start listening UDP port ", address.port); m_SSUServer->Start (); DetectExternalIP (); From 9ec671ba69f395fee5efcf8fbc14ac73552b677d Mon Sep 17 00:00:00 2001 From: orignal Date: Tue, 22 Apr 2014 11:39:26 -0400 Subject: [PATCH 3/3] split SSU to SSU and SSUData --- Makefile | 2 +- SSU.cpp | 121 ++------------------------------------------- SSU.h | 22 ++------- SSUData.cpp | 138 ++++++++++++++++++++++++++++++++++++++++++++++++++++ SSUData.h | 48 ++++++++++++++++++ 5 files changed, 196 insertions(+), 135 deletions(-) create mode 100644 SSUData.cpp create mode 100644 SSUData.h diff --git a/Makefile b/Makefile index 2d30a01c..33f513bf 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,7 @@ OBJECTS = obj/i2p.o obj/base64.o obj/NTCPSession.o obj/RouterInfo.o obj/Transpor obj/TunnelGateway.o obj/TransitTunnel.o obj/I2NPProtocol.o obj/Log.o obj/Garlic.o \ obj/HTTPServer.o obj/Streaming.o obj/Identity.o obj/SSU.o obj/util.o obj/Reseed.o \ obj/UPnP.o obj/TunnelPool.o obj/HTTPProxy.o obj/AddressBook.o \ - obj/Daemon.o obj/DaemonLinux.o + obj/Daemon.o obj/DaemonLinux.o obj/SSUData.o INCFLAGS = LDFLAGS = -Wl,-rpath,/usr/local/lib -lcryptopp -lboost_system -lboost_filesystem -lboost_regex -lboost_program_options -lpthread LIBS = diff --git a/SSU.cpp b/SSU.cpp index 2f2baf30..32dbf681 100644 --- a/SSU.cpp +++ b/SSU.cpp @@ -19,20 +19,14 @@ namespace ssu const i2p::data::RouterInfo * router, bool peerTest ): m_Server (server), m_RemoteEndpoint (remoteEndpoint), m_RemoteRouter (router), m_Timer (m_Server.GetService ()), m_PeerTest (peerTest), m_State (eSessionStateUnknown), - m_IsSessionKey (false), m_RelayTag (0) + m_IsSessionKey (false), m_RelayTag (0), m_Data (*this) { m_DHKeysPair = i2p::transports.GetNextDHKeysPair (); } SSUSession::~SSUSession () { - delete m_DHKeysPair; - for (auto it: m_IncomleteMessages) - if (it.second) - { - DeleteI2NPMessage (it.second->msg); - delete it.second; - } + delete m_DHKeysPair; } void SSUSession::CreateAESandMacKey (const uint8_t * pubKey, uint8_t * aesKey, uint8_t * macKey) @@ -603,6 +597,7 @@ namespace ssu void SSUSession::Established () { + m_State = eSessionStateEstablished; SendI2NPMessage (CreateDatabaseStoreMsg ()); if (!m_DelayedMessages.empty ()) { @@ -671,115 +666,7 @@ namespace ssu void SSUSession::ProcessData (uint8_t * buf, size_t len) { - //uint8_t * start = buf; - uint8_t flag = *buf; - buf++; - LogPrint ("Process SSU data flags=", (int)flag); - if (flag & DATA_FLAG_EXPLICIT_ACKS_INCLUDED) - { - // explicit ACKs - uint8_t numAcks =*buf; - buf++; - // TODO: process ACKs - buf += numAcks*4; - } - if (flag & DATA_FLAG_ACK_BITFIELDS_INCLUDED) - { - // explicit ACK bitfields - uint8_t numBitfields =*buf; - buf++; - for (int i = 0; i < numBitfields; i++) - { - buf += 4; // msgID - // TODO: process ACH bitfields - while (*buf & 0x80) // not last - buf++; - buf++; // last byte - } - } - uint8_t numFragments = *buf; // number of fragments - buf++; - for (int i = 0; i < numFragments; i++) - { - uint32_t msgID = be32toh (*(uint32_t *)buf); // message ID - buf += 4; - uint8_t frag[4]; - frag[0] = 0; - memcpy (frag + 1, buf, 3); - buf += 3; - uint32_t fragmentInfo = be32toh (*(uint32_t *)frag); // fragment info - uint16_t fragmentSize = fragmentInfo & 0x1FFF; // bits 0 - 13 - bool isLast = fragmentInfo & 0x010000; // bit 16 - uint8_t fragmentNum = fragmentInfo >> 17; // bits 23 - 17 - LogPrint ("SSU data fragment ", (int)fragmentNum, " of message ", msgID, " size=", (int)fragmentSize, isLast ? " last" : " non-last"); - I2NPMessage * msg = nullptr; - if (fragmentNum > 0) // follow-up fragment - { - auto it = m_IncomleteMessages.find (msgID); - if (it != m_IncomleteMessages.end ()) - { - if (fragmentNum == it->second->nextFragmentNum) - { - // expected fragment - msg = it->second->msg; - memcpy (msg->buf + msg->len, buf, fragmentSize); - msg->len += fragmentSize; - it->second->nextFragmentNum++; - } - else if (fragmentNum < it->second->nextFragmentNum) - // duplicate fragment - LogPrint ("Duplicate fragment ", fragmentNum, " of message ", msgID, ". Ignored"); - else - { - // missing fragment - LogPrint ("Missing fragments from ", it->second->nextFragmentNum, " to ", fragmentNum - 1, " of message ", msgID); - //TODO - } - - if (isLast) - { - delete it->second; - m_IncomleteMessages.erase (it); - } - } - else - // TODO: - LogPrint ("Unexpected follow-on fragment ", fragmentNum, " of message ", msgID); - } - else // first fragment - { - msg = NewI2NPMessage (); - memcpy (msg->GetSSUHeader (), buf, fragmentSize); - msg->len += fragmentSize - sizeof (I2NPHeaderShort); - } - - if (msg) - { - if (!fragmentNum && !isLast) - m_IncomleteMessages[msgID] = new IncompleteMessage (msg); - if (isLast) - { - SendMsgAck (msgID); - msg->FromSSU (msgID); - if (m_State == eSessionStateEstablished) - i2p::HandleI2NPMessage (msg); - else - { - // we expect DeliveryStatus - if (msg->GetHeader ()->typeID == eI2NPDeliveryStatus) - { - LogPrint ("SSU session established"); - m_State = eSessionStateEstablished; - Established (); - } - else - LogPrint ("SSU unexpected message ", (int)msg->GetHeader ()->typeID); - DeleteI2NPMessage (msg); - } - } - } - buf += fragmentSize; - } + m_Data.ProcessMessage (buf, len); } diff --git a/SSU.h b/SSU.h index 2d874335..16236a38 100644 --- a/SSU.h +++ b/SSU.h @@ -13,6 +13,7 @@ #include "Identity.h" #include "RouterInfo.h" #include "I2NPProtocol.h" +#include "SSUData.h" namespace i2p { @@ -45,14 +46,6 @@ namespace ssu const uint8_t PAYLOAD_TYPE_PEER_TEST = 7; const uint8_t PAYLOAD_TYPE_SESSION_DESTROYED = 8; - // data flags - const uint8_t DATA_FLAG_EXTENDED_DATA_INCLUDED = 0x02; - const uint8_t DATA_FLAG_WANT_REPLY = 0x04; - const uint8_t DATA_FLAG_REQUEST_PREVIOUS_ACKS = 0x08; - const uint8_t DATA_FLAG_EXPLICIT_CONGESTION_NOTIFICATION = 0x10; - const uint8_t DATA_FLAG_ACK_BITFIELDS_INCLUDED = 0x40; - const uint8_t DATA_FLAG_EXPLICIT_ACKS_INCLUDED = 0x80; - enum SessionState { eSessionStateUnknown, @@ -88,6 +81,8 @@ namespace ssu void SendI2NPMessage (I2NPMessage * msg); void SendPeerTest (); // Alice + SessionState GetState () const { return m_State; }; + private: void CreateAESandMacKey (const uint8_t * pubKey, uint8_t * aesKey, uint8_t * macKey); @@ -125,15 +120,8 @@ namespace ssu void HandleTerminationTimer (const boost::system::error_code& ecode); private: - - struct IncompleteMessage - { - I2NPMessage * msg; - uint8_t nextFragmentNum; - - IncompleteMessage (I2NPMessage * m): msg (m), nextFragmentNum (1) {}; - }; + friend class SSUData; // TODO: change in later SSUServer& m_Server; boost::asio::ip::udp::endpoint m_RemoteEndpoint; const i2p::data::RouterInfo * m_RemoteRouter; @@ -147,8 +135,8 @@ namespace ssu CryptoPP::CBC_Mode::Encryption m_Encryption; CryptoPP::CBC_Mode::Decryption m_Decryption; uint8_t m_SessionKey[32], m_MacKey[32]; - std::map m_IncomleteMessages; std::list m_DelayedMessages; + SSUData m_Data; }; class SSUServer diff --git a/SSUData.cpp b/SSUData.cpp new file mode 100644 index 00000000..2f3cbec2 --- /dev/null +++ b/SSUData.cpp @@ -0,0 +1,138 @@ +#include "Log.h" +#include "SSU.h" +#include "SSUData.h" + +namespace i2p +{ +namespace ssu +{ + SSUData::SSUData (SSUSession& session): + m_Session (session) + { + } + + SSUData::~SSUData () + { + for (auto it: m_IncomleteMessages) + if (it.second) + { + DeleteI2NPMessage (it.second->msg); + delete it.second; + } + } + + void SSUData::ProcessMessage (uint8_t * buf, size_t len) + { + //uint8_t * start = buf; + uint8_t flag = *buf; + buf++; + LogPrint ("Process SSU data flags=", (int)flag); + if (flag & DATA_FLAG_EXPLICIT_ACKS_INCLUDED) + { + // explicit ACKs + uint8_t numAcks =*buf; + buf++; + // TODO: process ACKs + buf += numAcks*4; + } + if (flag & DATA_FLAG_ACK_BITFIELDS_INCLUDED) + { + // explicit ACK bitfields + uint8_t numBitfields =*buf; + buf++; + for (int i = 0; i < numBitfields; i++) + { + buf += 4; // msgID + // TODO: process ACH bitfields + while (*buf & 0x80) // not last + buf++; + buf++; // last byte + } + } + uint8_t numFragments = *buf; // number of fragments + buf++; + for (int i = 0; i < numFragments; i++) + { + uint32_t msgID = be32toh (*(uint32_t *)buf); // message ID + buf += 4; + uint8_t frag[4]; + frag[0] = 0; + memcpy (frag + 1, buf, 3); + buf += 3; + uint32_t fragmentInfo = be32toh (*(uint32_t *)frag); // fragment info + uint16_t fragmentSize = fragmentInfo & 0x1FFF; // bits 0 - 13 + bool isLast = fragmentInfo & 0x010000; // bit 16 + uint8_t fragmentNum = fragmentInfo >> 17; // bits 23 - 17 + LogPrint ("SSU data fragment ", (int)fragmentNum, " of message ", msgID, " size=", (int)fragmentSize, isLast ? " last" : " non-last"); + I2NPMessage * msg = nullptr; + if (fragmentNum > 0) // follow-up fragment + { + auto it = m_IncomleteMessages.find (msgID); + if (it != m_IncomleteMessages.end ()) + { + if (fragmentNum == it->second->nextFragmentNum) + { + // expected fragment + msg = it->second->msg; + memcpy (msg->buf + msg->len, buf, fragmentSize); + msg->len += fragmentSize; + it->second->nextFragmentNum++; + } + else if (fragmentNum < it->second->nextFragmentNum) + // duplicate fragment + LogPrint ("Duplicate fragment ", fragmentNum, " of message ", msgID, ". Ignored"); + else + { + // missing fragment + LogPrint ("Missing fragments from ", it->second->nextFragmentNum, " to ", fragmentNum - 1, " of message ", msgID); + //TODO + } + + if (isLast) + { + delete it->second; + m_IncomleteMessages.erase (it); + } + } + else + // TODO: + LogPrint ("Unexpected follow-on fragment ", fragmentNum, " of message ", msgID); + } + else // first fragment + { + msg = NewI2NPMessage (); + memcpy (msg->GetSSUHeader (), buf, fragmentSize); + msg->len += fragmentSize - sizeof (I2NPHeaderShort); + } + + if (msg) + { + if (!fragmentNum && !isLast) + m_IncomleteMessages[msgID] = new IncompleteMessage (msg); + if (isLast) + { + m_Session.SendMsgAck (msgID); + msg->FromSSU (msgID); + if (m_Session.GetState () == eSessionStateEstablished) + i2p::HandleI2NPMessage (msg); + else + { + // we expect DeliveryStatus + if (msg->GetHeader ()->typeID == eI2NPDeliveryStatus) + { + LogPrint ("SSU session established"); + m_Session.Established (); + } + else + LogPrint ("SSU unexpected message ", (int)msg->GetHeader ()->typeID); + DeleteI2NPMessage (msg); + } + } + } + buf += fragmentSize; + } + } + +} +} + diff --git a/SSUData.h b/SSUData.h new file mode 100644 index 00000000..6f6c398d --- /dev/null +++ b/SSUData.h @@ -0,0 +1,48 @@ +#ifndef SSU_DATA_H__ +#define SSU_DATA_H__ + +#include +#include +#include "I2NPProtocol.h" + +namespace i2p +{ +namespace ssu +{ + + // data flags + const uint8_t DATA_FLAG_EXTENDED_DATA_INCLUDED = 0x02; + const uint8_t DATA_FLAG_WANT_REPLY = 0x04; + const uint8_t DATA_FLAG_REQUEST_PREVIOUS_ACKS = 0x08; + const uint8_t DATA_FLAG_EXPLICIT_CONGESTION_NOTIFICATION = 0x10; + const uint8_t DATA_FLAG_ACK_BITFIELDS_INCLUDED = 0x40; + const uint8_t DATA_FLAG_EXPLICIT_ACKS_INCLUDED = 0x80; + + class SSUSession; + class SSUData + { + public: + + SSUData (SSUSession& session); + ~SSUData (); + + void ProcessMessage (uint8_t * buf, size_t len); + + private: + + struct IncompleteMessage + { + I2NPMessage * msg; + uint8_t nextFragmentNum; + + IncompleteMessage (I2NPMessage * m): msg (m), nextFragmentNum (1) {}; + }; + + SSUSession& m_Session; + std::map m_IncomleteMessages; + }; +} +} + +#endif +