Browse Source

send batch of I2NP messages

pull/151/head
orignal 10 years ago
parent
commit
ea353ac3ba
  1. 11
      NTCPSession.cpp
  2. 2
      NTCPSession.h
  3. 14
      SSUSession.cpp
  4. 2
      SSUSession.h
  5. 2
      TransportSession.h
  6. 38
      Transports.cpp
  7. 6
      Transports.h
  8. 2
      TunnelGateway.cpp

11
NTCPSession.cpp

@ -617,6 +617,17 @@ namespace transport
if (msg) if (msg)
Send (msg); Send (msg);
} }
void NTCPSession::SendI2NPMessages (const std::vector<I2NPMessage *>& msgs)
{
m_Server.GetService ().post (std::bind (&NTCPSession::PostI2NPMessages, shared_from_this (), msgs));
}
void NTCPSession::PostI2NPMessages (std::vector<I2NPMessage *> msgs)
{
for (auto it: msgs)
if (it) Send (it);
}
void NTCPSession::ScheduleTermination () void NTCPSession::ScheduleTermination ()
{ {

2
NTCPSession.h

@ -61,6 +61,7 @@ namespace transport
void ClientLogin (); void ClientLogin ();
void ServerLogin (); void ServerLogin ();
void SendI2NPMessage (I2NPMessage * msg); void SendI2NPMessage (I2NPMessage * msg);
void SendI2NPMessages (const std::vector<I2NPMessage *>& msgs);
size_t GetNumSentBytes () const { return m_NumSentBytes; }; size_t GetNumSentBytes () const { return m_NumSentBytes; };
size_t GetNumReceivedBytes () const { return m_NumReceivedBytes; }; size_t GetNumReceivedBytes () const { return m_NumReceivedBytes; };
@ -68,6 +69,7 @@ namespace transport
protected: protected:
void PostI2NPMessage (I2NPMessage * msg); void PostI2NPMessage (I2NPMessage * msg);
void PostI2NPMessages (std::vector<I2NPMessage *> msgs);
void Connected (); void Connected ();
void SendTimeSyncMessage (); void SendTimeSyncMessage ();
void SetIsEstablished (bool isEstablished) { m_IsEstablished = isEstablished; } void SetIsEstablished (bool isEstablished) { m_IsEstablished = isEstablished; }

14
SSUSession.cpp

@ -827,7 +827,19 @@ namespace transport
if (msg) if (msg)
m_Data.Send (msg); m_Data.Send (msg);
} }
void SSUSession::SendI2NPMessages (const std::vector<I2NPMessage *>& msgs)
{
boost::asio::io_service& service = IsV6 () ? m_Server.GetServiceV6 () : m_Server.GetService ();
service.post (std::bind (&SSUSession::PostI2NPMessages, shared_from_this (), msgs));
}
void SSUSession::PostI2NPMessages (std::vector<I2NPMessage *> msgs)
{
for (auto it: msgs)
if (it) m_Data.Send (it);
}
void SSUSession::ProcessData (uint8_t * buf, size_t len) void SSUSession::ProcessData (uint8_t * buf, size_t len)
{ {
m_Data.ProcessMessage (buf, len); m_Data.ProcessMessage (buf, len);

2
SSUSession.h

@ -66,6 +66,7 @@ namespace transport
boost::asio::ip::udp::endpoint& GetRemoteEndpoint () { return m_RemoteEndpoint; }; boost::asio::ip::udp::endpoint& GetRemoteEndpoint () { return m_RemoteEndpoint; };
bool IsV6 () const { return m_RemoteEndpoint.address ().is_v6 (); }; bool IsV6 () const { return m_RemoteEndpoint.address ().is_v6 (); };
void SendI2NPMessage (I2NPMessage * msg); void SendI2NPMessage (I2NPMessage * msg);
void SendI2NPMessages (const std::vector<I2NPMessage *>& msgs);
void SendPeerTest (); // Alice void SendPeerTest (); // Alice
SessionState GetState () const { return m_State; }; SessionState GetState () const { return m_State; };
@ -81,6 +82,7 @@ namespace transport
void CreateAESandMacKey (const uint8_t * pubKey); void CreateAESandMacKey (const uint8_t * pubKey);
void PostI2NPMessage (I2NPMessage * msg); void PostI2NPMessage (I2NPMessage * msg);
void PostI2NPMessages (std::vector<I2NPMessage *> msgs);
void ProcessMessage (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint); // call for established session void ProcessMessage (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint); // call for established session
void ProcessSessionRequest (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint); void ProcessSessionRequest (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint);
void SendSessionRequest (); void SendSessionRequest ();

2
TransportSession.h

@ -4,6 +4,7 @@
#include <inttypes.h> #include <inttypes.h>
#include <iostream> #include <iostream>
#include <memory> #include <memory>
#include <vector>
#include "Identity.h" #include "Identity.h"
#include "RouterInfo.h" #include "RouterInfo.h"
#include "I2NPProtocol.h" #include "I2NPProtocol.h"
@ -66,6 +67,7 @@ namespace transport
const i2p::data::IdentityEx& GetRemoteIdentity () { return m_RemoteIdentity; }; const i2p::data::IdentityEx& GetRemoteIdentity () { return m_RemoteIdentity; };
virtual void SendI2NPMessage (I2NPMessage * msg) = 0; virtual void SendI2NPMessage (I2NPMessage * msg) = 0;
virtual void SendI2NPMessages (const std::vector<I2NPMessage *>& msgs) = 0;
protected: protected:

38
Transports.cpp

@ -184,6 +184,11 @@ namespace transport
m_Service.post (std::bind (&Transports::PostMessage, this, ident, msg)); m_Service.post (std::bind (&Transports::PostMessage, this, ident, msg));
} }
void Transports::SendMessages (const i2p::data::IdentHash& ident, const std::vector<i2p::I2NPMessage *>& msgs)
{
m_Service.post (std::bind (&Transports::PostMessages, this, ident, msgs));
}
void Transports::PostMessage (const i2p::data::IdentHash& ident, i2p::I2NPMessage * msg) void Transports::PostMessage (const i2p::data::IdentHash& ident, i2p::I2NPMessage * msg)
{ {
if (ident == i2p::context.GetRouterInfo ().GetIdentHash ()) if (ident == i2p::context.GetRouterInfo ().GetIdentHash ())
@ -210,6 +215,36 @@ namespace transport
it->second.delayedMessages.push_back (msg); it->second.delayedMessages.push_back (msg);
} }
void Transports::PostMessages (const i2p::data::IdentHash& ident, std::vector<i2p::I2NPMessage *> msgs)
{
if (ident == i2p::context.GetRouterInfo ().GetIdentHash ())
{
// we send it to ourself
for (auto it: msgs)
i2p::HandleI2NPMessage (it);
return;
}
auto it = m_Peers.find (ident);
if (it == m_Peers.end ())
{
auto r = netdb.FindRouter (ident);
it = m_Peers.insert (std::pair<i2p::data::IdentHash, Peer>(ident, { 0, r, nullptr})).first;
if (!ConnectToPeer (ident, it->second))
{
for (auto it1: msgs)
DeleteI2NPMessage (it1);
return;
}
}
if (it->second.session)
it->second.session->SendI2NPMessages (msgs);
else
{
for (auto it1: msgs)
it->second.delayedMessages.push_back (it1);
}
}
bool Transports::ConnectToPeer (const i2p::data::IdentHash& ident, Peer& peer) bool Transports::ConnectToPeer (const i2p::data::IdentHash& ident, Peer& peer)
{ {
if (peer.router) // we have RI already if (peer.router) // we have RI already
@ -365,8 +400,7 @@ namespace transport
if (it != m_Peers.end ()) if (it != m_Peers.end ())
{ {
it->second.session = session; it->second.session = session;
for (auto it1: it->second.delayedMessages) session->SendI2NPMessages (it->second.delayedMessages);
session->SendI2NPMessage (it1);
it->second.delayedMessages.clear (); it->second.delayedMessages.clear ();
} }
else // incoming connection else // incoming connection

6
Transports.h

@ -6,7 +6,7 @@
#include <condition_variable> #include <condition_variable>
#include <functional> #include <functional>
#include <map> #include <map>
#include <list> #include <vector>
#include <queue> #include <queue>
#include <string> #include <string>
#include <memory> #include <memory>
@ -56,7 +56,7 @@ namespace transport
int numAttempts; int numAttempts;
std::shared_ptr<const i2p::data::RouterInfo> router; std::shared_ptr<const i2p::data::RouterInfo> router;
std::shared_ptr<TransportSession> session; std::shared_ptr<TransportSession> session;
std::list<i2p::I2NPMessage *> delayedMessages; std::vector<i2p::I2NPMessage *> delayedMessages;
~Peer () ~Peer ()
{ {
@ -80,6 +80,7 @@ namespace transport
void ReuseDHKeysPair (DHKeysPair * pair); void ReuseDHKeysPair (DHKeysPair * pair);
void SendMessage (const i2p::data::IdentHash& ident, i2p::I2NPMessage * msg); void SendMessage (const i2p::data::IdentHash& ident, i2p::I2NPMessage * msg);
void SendMessages (const i2p::data::IdentHash& ident, const std::vector<i2p::I2NPMessage *>& msgs);
void CloseSession (std::shared_ptr<const i2p::data::RouterInfo> router); void CloseSession (std::shared_ptr<const i2p::data::RouterInfo> router);
void PeerConnected (std::shared_ptr<TransportSession> session); void PeerConnected (std::shared_ptr<TransportSession> session);
@ -91,6 +92,7 @@ namespace transport
void RequestComplete (std::shared_ptr<const i2p::data::RouterInfo> r, const i2p::data::IdentHash& ident); void RequestComplete (std::shared_ptr<const i2p::data::RouterInfo> r, const i2p::data::IdentHash& ident);
void HandleRequestComplete (std::shared_ptr<const i2p::data::RouterInfo> r, const i2p::data::IdentHash& ident); void HandleRequestComplete (std::shared_ptr<const i2p::data::RouterInfo> r, const i2p::data::IdentHash& ident);
void PostMessage (const i2p::data::IdentHash& ident, i2p::I2NPMessage * msg); void PostMessage (const i2p::data::IdentHash& ident, i2p::I2NPMessage * msg);
void PostMessages (const i2p::data::IdentHash& ident, std::vector<i2p::I2NPMessage *> msgs);
void PostCloseSession (std::shared_ptr<const i2p::data::RouterInfo> router); void PostCloseSession (std::shared_ptr<const i2p::data::RouterInfo> router);
bool ConnectToPeer (const i2p::data::IdentHash& ident, Peer& peer); bool ConnectToPeer (const i2p::data::IdentHash& ident, Peer& peer);

2
TunnelGateway.cpp

@ -188,9 +188,9 @@ namespace tunnel
{ {
m_Tunnel->EncryptTunnelMsg (tunnelMsg); m_Tunnel->EncryptTunnelMsg (tunnelMsg);
FillI2NPMessageHeader (tunnelMsg, eI2NPTunnelData); FillI2NPMessageHeader (tunnelMsg, eI2NPTunnelData);
i2p::transport::transports.SendMessage (m_Tunnel->GetNextIdentHash (), tunnelMsg);
m_NumSentBytes += TUNNEL_DATA_MSG_SIZE; m_NumSentBytes += TUNNEL_DATA_MSG_SIZE;
} }
i2p::transport::transports.SendMessages (m_Tunnel->GetNextIdentHash (), tunnelMsgs);
m_Buffer.ClearTunnelDataMsgs (); m_Buffer.ClearTunnelDataMsgs ();
} }
} }

Loading…
Cancel
Save