Browse Source

Merge pull request #19 from orignal/master

Merge pull request from orignal/master
pull/49/head
chertov 11 years ago
parent
commit
57a520889a
  1. 12
      SSU.cpp
  2. 1
      SSU.h
  3. 47
      Streaming.cpp
  4. 39
      Streaming.h
  5. 44
      Transports.cpp

12
SSU.cpp

@ -796,6 +796,18 @@ namespace ssu
LogPrint ("SSU receive error: ", ecode.message ()); LogPrint ("SSU receive error: ", ecode.message ());
} }
SSUSession * SSUServer::FindSession (const i2p::data::RouterInfo * router)
{
if (!router) return nullptr;
auto address = router->GetSSUAddress ();
if (!address) return nullptr;
auto it = m_Sessions.find (boost::asio::ip::udp::endpoint (address->host, address->port));
if (it != m_Sessions.end ())
return it->second;
else
return nullptr;
}
SSUSession * SSUServer::GetSession (const i2p::data::RouterInfo * router) SSUSession * SSUServer::GetSession (const i2p::data::RouterInfo * router)
{ {
SSUSession * session = nullptr; SSUSession * session = nullptr;

1
SSU.h

@ -126,6 +126,7 @@ namespace ssu
void Start (); void Start ();
void Stop (); void Stop ();
SSUSession * GetSession (const i2p::data::RouterInfo * router); SSUSession * GetSession (const i2p::data::RouterInfo * router);
SSUSession * FindSession (const i2p::data::RouterInfo * router);
void DeleteSession (SSUSession * session); void DeleteSession (SSUSession * session);
void DeleteAllSessions (); void DeleteAllSessions ();

47
Streaming.cpp

@ -1,6 +1,5 @@
#include <fstream> #include <fstream>
#include <algorithm> #include <algorithm>
#include <boost/bind.hpp>
#include <cryptopp/gzip.h> #include <cryptopp/gzip.h>
#include "Log.h" #include "Log.h"
#include "RouterInfo.h" #include "RouterInfo.h"
@ -19,7 +18,7 @@ namespace stream
const i2p::data::LeaseSet& remote): m_Service (service), m_SendStreamID (0), const i2p::data::LeaseSet& remote): m_Service (service), m_SendStreamID (0),
m_SequenceNumber (0), m_LastReceivedSequenceNumber (0), m_IsOpen (false), m_SequenceNumber (0), m_LastReceivedSequenceNumber (0), m_IsOpen (false),
m_LeaseSetUpdated (true), m_LocalDestination (local), m_RemoteLeaseSet (remote), m_LeaseSetUpdated (true), m_LocalDestination (local), m_RemoteLeaseSet (remote),
m_OutboundTunnel (nullptr) m_OutboundTunnel (nullptr), m_ReceiveTimer (m_Service)
{ {
m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 ();
UpdateCurrentRemoteLease (); UpdateCurrentRemoteLease ();
@ -27,6 +26,7 @@ namespace stream
Stream::~Stream () Stream::~Stream ()
{ {
m_ReceiveTimer.cancel ();
while (auto packet = m_ReceiveQueue.Get ()) while (auto packet = m_ReceiveQueue.Get ())
delete packet; delete packet;
for (auto it: m_SavedPackets) for (auto it: m_SavedPackets)
@ -57,7 +57,10 @@ namespace stream
} }
else else
break; break;
} }
// send ack for last message
SendQuickAck ();
} }
else else
{ {
@ -117,13 +120,13 @@ namespace stream
delete packet; delete packet;
m_LastReceivedSequenceNumber = receivedSeqn; m_LastReceivedSequenceNumber = receivedSeqn;
SendQuickAck ();
if (flags & PACKET_FLAG_CLOSE) if (flags & PACKET_FLAG_CLOSE)
{ {
LogPrint ("Closed"); LogPrint ("Closed");
m_IsOpen = false; m_IsOpen = false;
m_ReceiveQueue.WakeUp (); m_ReceiveQueue.WakeUp ();
m_ReceiveTimer.cancel ();
} }
} }
@ -141,7 +144,9 @@ namespace stream
void Stream::ConnectAndSend (uint8_t * buf, size_t len) void Stream::ConnectAndSend (uint8_t * buf, size_t len)
{ {
m_IsOpen = true; m_IsOpen = true;
uint8_t packet[STREAMING_MTU]; Packet * p = new Packet ();
uint8_t * packet = p->GetBuffer ();
// TODO: implement setters
size_t size = 0; size_t size = 0;
*(uint32_t *)(packet + size) = htobe32 (m_SendStreamID); *(uint32_t *)(packet + size) = htobe32 (m_SendStreamID);
size += 4; // sendStreamID size += 4; // sendStreamID
@ -167,18 +172,18 @@ namespace stream
size += 2; // max packet size size += 2; // max packet size
uint8_t * signature = packet + size; // set it later uint8_t * signature = packet + size; // set it later
memset (signature, 0, 40); // zeroes for now memset (signature, 0, 40); // zeroes for now
size += 40; // signature size += 40; // signature
memcpy (packet + size, buf, len); memcpy (packet + size, buf, len);
size += len; // payload size += len; // payload
m_LocalDestination->Sign (packet, size, signature); m_LocalDestination->Sign (packet, size, signature);
p->len = size;
SendPacket (packet, size); m_Service.post (boost::bind (&Stream::SendPacket, this, p));
} }
void Stream::SendQuickAck () void Stream::SendQuickAck ()
{ {
uint8_t packet[STREAMING_MTU]; uint8_t packet[MAX_PACKET_SIZE];
size_t size = 0; size_t size = 0;
*(uint32_t *)(packet + size) = htobe32 (m_SendStreamID); *(uint32_t *)(packet + size) = htobe32 (m_SendStreamID);
size += 4; // sendStreamID size += 4; // sendStreamID
@ -195,7 +200,7 @@ namespace stream
size += 2; // flags size += 2; // flags
*(uint16_t *)(packet + size) = 0; // no options *(uint16_t *)(packet + size) = 0; // no options
size += 2; // options size size += 2; // options size
if (SendPacket (packet, size)) if (SendPacket (packet, size))
LogPrint ("Quick Ack sent"); LogPrint ("Quick Ack sent");
} }
@ -205,7 +210,7 @@ namespace stream
if (m_IsOpen) if (m_IsOpen)
{ {
m_IsOpen = false; m_IsOpen = false;
uint8_t packet[STREAMING_MTU]; uint8_t packet[MAX_PACKET_SIZE];
size_t size = 0; size_t size = 0;
*(uint32_t *)(packet + size) = htobe32 (m_SendStreamID); *(uint32_t *)(packet + size) = htobe32 (m_SendStreamID);
size += 4; // sendStreamID size += 4; // sendStreamID
@ -226,7 +231,7 @@ namespace stream
memset (packet + size, 0, 40); memset (packet + size, 0, 40);
size += 40; // signature size += 40; // signature
m_LocalDestination->Sign (packet, size, signature); m_LocalDestination->Sign (packet, size, signature);
if (SendPacket (packet, size)) if (SendPacket (packet, size))
LogPrint ("FIN sent"); LogPrint ("FIN sent");
m_ReceiveQueue.WakeUp (); m_ReceiveQueue.WakeUp ();
@ -266,7 +271,19 @@ namespace stream
return pos; return pos;
} }
bool Stream::SendPacket (uint8_t * packet, size_t size) bool Stream::SendPacket (Packet * packet)
{
if (packet)
{
bool ret = SendPacket (packet->GetBuffer (), packet->GetLength ());
delete packet;
return ret;
}
else
return false;
}
bool Stream::SendPacket (const uint8_t * buf, size_t len)
{ {
const I2NPMessage * leaseSet = nullptr; const I2NPMessage * leaseSet = nullptr;
if (m_LeaseSetUpdated) if (m_LeaseSetUpdated)
@ -275,7 +292,7 @@ namespace stream
m_LeaseSetUpdated = false; m_LeaseSetUpdated = false;
} }
I2NPMessage * msg = i2p::garlic::routing.WrapMessage (m_RemoteLeaseSet, I2NPMessage * msg = i2p::garlic::routing.WrapMessage (m_RemoteLeaseSet,
CreateDataMessage (this, packet, size), leaseSet); CreateDataMessage (this, buf, len), leaseSet);
if (!m_OutboundTunnel) if (!m_OutboundTunnel)
m_OutboundTunnel = m_LocalDestination->GetTunnelPool ()->GetNextOutboundTunnel (); m_OutboundTunnel = m_LocalDestination->GetTunnelPool ()->GetNextOutboundTunnel ();
if (m_OutboundTunnel) if (m_OutboundTunnel)
@ -540,7 +557,7 @@ namespace stream
LogPrint ("Data: protocol ", buf[9], " is not supported"); LogPrint ("Data: protocol ", buf[9], " is not supported");
} }
I2NPMessage * CreateDataMessage (Stream * s, uint8_t * payload, size_t len) I2NPMessage * CreateDataMessage (Stream * s, const uint8_t * payload, size_t len)
{ {
I2NPMessage * msg = NewI2NPMessage (); I2NPMessage * msg = NewI2NPMessage ();
CryptoPP::Gzip compressor; CryptoPP::Gzip compressor;

39
Streaming.h

@ -7,6 +7,7 @@
#include <set> #include <set>
#include <thread> #include <thread>
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <cryptopp/dsa.h> #include <cryptopp/dsa.h>
#include "I2PEndian.h" #include "I2PEndian.h"
#include "Queue.h" #include "Queue.h"
@ -37,7 +38,7 @@ namespace stream
struct Packet struct Packet
{ {
uint8_t buf[1754]; uint8_t buf[MAX_PACKET_SIZE];
size_t len, offset; size_t len, offset;
Packet (): len (0), offset (0) {}; Packet (): len (0), offset (0) {};
@ -80,21 +81,28 @@ namespace stream
void HandleNextPacket (Packet * packet); void HandleNextPacket (Packet * packet);
size_t Send (uint8_t * buf, size_t len, int timeout); // timeout in seconds size_t Send (uint8_t * buf, size_t len, int timeout); // timeout in seconds
size_t Receive (uint8_t * buf, size_t len, int timeout = 0); // returns 0 if timeout expired size_t Receive (uint8_t * buf, size_t len, int timeout = 0); // returns 0 if timeout expired
template<typename Buffer, typename ReceiveHandler>
void AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout = 0);
void Close (); void Close ();
void SetLeaseSetUpdated () { m_LeaseSetUpdated = true; }; void SetLeaseSetUpdated () { m_LeaseSetUpdated = true; };
private: private:
void ConnectAndSend (uint8_t * buf, size_t len); void ConnectAndSend (uint8_t * buf, size_t len);
void SendQuickAck (); void SendQuickAck ();
bool SendPacket (uint8_t * packet, size_t size); bool SendPacket (Packet * packet);
bool SendPacket (const uint8_t * buf, size_t len);
void SavePacket (Packet * packet); void SavePacket (Packet * packet);
void ProcessPacket (Packet * packet); void ProcessPacket (Packet * packet);
void UpdateCurrentRemoteLease (); void UpdateCurrentRemoteLease ();
template<typename Buffer, typename ReceiveHandler>
void HandleReceiveTimer (const boost::system::error_code& ecode, const Buffer& buffer, ReceiveHandler handler);
private: private:
boost::asio::io_service& m_Service; boost::asio::io_service& m_Service;
@ -106,6 +114,7 @@ namespace stream
i2p::util::Queue<Packet> m_ReceiveQueue; i2p::util::Queue<Packet> m_ReceiveQueue;
std::set<Packet *, PacketCmp> m_SavedPackets; std::set<Packet *, PacketCmp> m_SavedPackets;
i2p::tunnel::OutboundTunnel * m_OutboundTunnel; i2p::tunnel::OutboundTunnel * m_OutboundTunnel;
boost::asio::deadline_timer m_ReceiveTimer;
}; };
class StreamingDestination: public i2p::data::LocalDestination class StreamingDestination: public i2p::data::LocalDestination
@ -183,7 +192,29 @@ namespace stream
// assuming data is I2CP message // assuming data is I2CP message
void HandleDataMessage (i2p::data::IdentHash destination, const uint8_t * buf, size_t len); void HandleDataMessage (i2p::data::IdentHash destination, const uint8_t * buf, size_t len);
I2NPMessage * CreateDataMessage (Stream * s, uint8_t * payload, size_t len); I2NPMessage * CreateDataMessage (Stream * s, const uint8_t * payload, size_t len);
//-------------------------------------------------
template<typename Buffer, typename ReceiveHandler>
void Stream::AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout)
{
m_ReceiveTimer.expires_from_now (boost::posix_time::seconds(timeout));
m_ReceiveTimer.async_wait (boost::bind (&Stream::HandleReceiveTimer,
this, boost::asio::placeholders::error, buffer, handler));
}
template<typename Buffer, typename ReceiveHandler>
void Stream::HandleReceiveTimer (const boost::system::error_code& ecode, const Buffer& buffer, ReceiveHandler handler)
{
// TODO:
if (ecode == boost::asio::error::operation_aborted)
// timeout not expired
handler (boost::system::error_code (), 0);
else
// timeout expired
handler (boost::asio::error::make_error_code (boost::asio::error::timed_out), 0);
}
} }
} }

44
Transports.cpp

@ -156,34 +156,36 @@ namespace i2p
void Transports::PostMessage (const i2p::data::IdentHash& ident, i2p::I2NPMessage * msg) void Transports::PostMessage (const i2p::data::IdentHash& ident, i2p::I2NPMessage * msg)
{ {
auto session = FindNTCPSession (ident); auto session = FindNTCPSession (ident);
if (!session) if (session)
session->SendI2NPMessage (msg);
else
{ {
RouterInfo * r = netdb.FindRouter (ident); RouterInfo * r = netdb.FindRouter (ident);
if (r) if (r)
{ {
auto address = r->GetNTCPAddress (); auto ssuSession = m_SSUServer ? m_SSUServer->FindSession (r) : nullptr;
if (address) if (ssuSession)
{ ssuSession->SendI2NPMessage (msg);
session = new i2p::ntcp::NTCPClient (m_Service, address->host, address->port, *r);
AddNTCPSession (session);
}
else else
{ {
// SSU always have lower prioprity than NTCP // existing session not found. create new
// TODO: it shouldn't // try NTCP first
LogPrint ("No NTCP addresses available. Trying SSU"); auto address = r->GetNTCPAddress ();
address = r->GetSSUAddress (); if (address)
if (address && m_SSUServer) {
{ auto s = new i2p::ntcp::NTCPClient (m_Service, address->host, address->port, *r);
auto s = m_SSUServer->GetSession (r); AddNTCPSession (s);
s->SendI2NPMessage (msg);
}
else
{
// then SSU
auto s = m_SSUServer ? m_SSUServer->GetSession (r) : nullptr;
if (s) if (s)
{
s->SendI2NPMessage (msg); s->SendI2NPMessage (msg);
return; else
} LogPrint ("No NTCP and SSU addresses available");
} }
else
LogPrint ("No SSU addresses available");
} }
} }
else else
@ -192,10 +194,6 @@ namespace i2p
i2p::data::netdb.RequestDestination (ident); i2p::data::netdb.RequestDestination (ident);
} }
} }
if (session)
session->SendI2NPMessage (msg);
else
LogPrint ("Session not found");
} }
void Transports::DetectExternalIP () void Transports::DetectExternalIP ()

Loading…
Cancel
Save