Browse Source

use shared_ptr for I2NP messages through tunnels

pull/210/head
orignal 10 years ago
parent
commit
4ed7e29896
  1. 2
      Datagram.cpp
  2. 4
      Destination.cpp
  3. 30
      I2NPProtocol.cpp
  4. 6
      I2NPProtocol.h
  5. 10
      NetDb.cpp
  6. 2
      RouterContext.cpp
  7. 2
      Streaming.cpp
  8. 18
      TransitTunnel.cpp
  9. 12
      TransitTunnel.h
  10. 7
      Transports.cpp
  11. 36
      Tunnel.cpp
  12. 16
      Tunnel.h
  13. 8
      TunnelBase.h
  14. 37
      TunnelEndpoint.cpp
  15. 6
      TunnelEndpoint.h
  16. 6
      TunnelGateway.cpp

2
Datagram.cpp

@ -65,7 +65,7 @@ namespace datagram
{ {
i2p::tunnel::eDeliveryTypeTunnel, i2p::tunnel::eDeliveryTypeTunnel,
leases[i].tunnelGateway, leases[i].tunnelID, leases[i].tunnelGateway, leases[i].tunnelID,
garlic ToSharedI2NPMessage (garlic)
}); });
outboundTunnel->SendTunnelDataMsg (msgs); outboundTunnel->SendTunnelDataMsg (msgs);
} }

4
Destination.cpp

@ -240,7 +240,7 @@ namespace client
HandleDatabaseSearchReplyMessage (buf + I2NP_HEADER_SIZE, bufbe16toh (buf + I2NP_HEADER_SIZE_OFFSET)); HandleDatabaseSearchReplyMessage (buf + I2NP_HEADER_SIZE, bufbe16toh (buf + I2NP_HEADER_SIZE_OFFSET));
break; break;
default: default:
i2p::HandleI2NPMessage (CreateI2NPMessage (buf, GetI2NPMessageLength (buf), from)); i2p::HandleI2NPMessage (ToSharedI2NPMessage (CreateI2NPMessage (buf, GetI2NPMessageLength (buf), from)));
} }
} }
@ -589,7 +589,7 @@ namespace client
i2p::tunnel::TunnelMessageBlock i2p::tunnel::TunnelMessageBlock
{ {
i2p::tunnel::eDeliveryTypeRouter, i2p::tunnel::eDeliveryTypeRouter,
nextFloodfill->GetIdentHash (), 0, msg nextFloodfill->GetIdentHash (), 0, ToSharedI2NPMessage (msg)
} }
}); });
request->requestTimeoutTimer.expires_from_now (boost::posix_time::seconds(LEASESET_REQUEST_TIMEOUT)); request->requestTimeoutTimer.expires_from_now (boost::posix_time::seconds(LEASESET_REQUEST_TIMEOUT));

30
I2NPProtocol.cpp

@ -445,7 +445,7 @@ namespace i2p
return msg; return msg;
} }
I2NPMessage * CreateTunnelGatewayMsg (uint32_t tunnelID, I2NPMessage * msg) std::shared_ptr<I2NPMessage> CreateTunnelGatewayMsg (uint32_t tunnelID, std::shared_ptr<I2NPMessage> msg)
{ {
if (msg->offset >= I2NP_HEADER_SIZE + TUNNEL_GATEWAY_HEADER_SIZE) if (msg->offset >= I2NP_HEADER_SIZE + TUNNEL_GATEWAY_HEADER_SIZE)
{ {
@ -456,14 +456,13 @@ namespace i2p
htobe16buf (payload + TUNNEL_GATEWAY_HEADER_LENGTH_OFFSET, len); htobe16buf (payload + TUNNEL_GATEWAY_HEADER_LENGTH_OFFSET, len);
msg->offset -= (I2NP_HEADER_SIZE + TUNNEL_GATEWAY_HEADER_SIZE); msg->offset -= (I2NP_HEADER_SIZE + TUNNEL_GATEWAY_HEADER_SIZE);
msg->len = msg->offset + I2NP_HEADER_SIZE + TUNNEL_GATEWAY_HEADER_SIZE +len; msg->len = msg->offset + I2NP_HEADER_SIZE + TUNNEL_GATEWAY_HEADER_SIZE +len;
FillI2NPMessageHeader (msg, eI2NPTunnelGateway); FillI2NPMessageHeader (msg.get(), eI2NPTunnelGateway); // TODO
return msg; return msg;
} }
else else
{ {
I2NPMessage * msg1 = CreateTunnelGatewayMsg (tunnelID, msg->GetBuffer (), msg->GetLength ()); I2NPMessage * msg1 = CreateTunnelGatewayMsg (tunnelID, msg->GetBuffer (), msg->GetLength ());
DeleteI2NPMessage (msg); return ToSharedI2NPMessage (msg1);
return msg1;
} }
} }
@ -522,7 +521,7 @@ namespace i2p
} }
} }
void HandleI2NPMessage (I2NPMessage * msg) void HandleI2NPMessage (std::shared_ptr<I2NPMessage> msg)
{ {
if (msg) if (msg)
{ {
@ -539,32 +538,30 @@ namespace i2p
case eI2NPGarlic: case eI2NPGarlic:
{ {
LogPrint ("Garlic"); LogPrint ("Garlic");
auto sharedMsg = ToSharedI2NPMessage (msg);
if (msg->from) if (msg->from)
{ {
if (msg->from->GetTunnelPool ()) if (msg->from->GetTunnelPool ())
msg->from->GetTunnelPool ()->ProcessGarlicMessage (sharedMsg); msg->from->GetTunnelPool ()->ProcessGarlicMessage (msg);
else else
LogPrint (eLogInfo, "Local destination for garlic doesn't exist anymore"); LogPrint (eLogInfo, "Local destination for garlic doesn't exist anymore");
} }
else else
i2p::context.ProcessGarlicMessage (sharedMsg); i2p::context.ProcessGarlicMessage (msg);
break; break;
} }
case eI2NPDatabaseStore: case eI2NPDatabaseStore:
case eI2NPDatabaseSearchReply: case eI2NPDatabaseSearchReply:
case eI2NPDatabaseLookup: case eI2NPDatabaseLookup:
// forward to netDb // forward to netDb
i2p::data::netdb.PostI2NPMsg (ToSharedI2NPMessage (msg)); i2p::data::netdb.PostI2NPMsg (msg);
break; break;
case eI2NPDeliveryStatus: case eI2NPDeliveryStatus:
{ {
LogPrint ("DeliveryStatus"); LogPrint ("DeliveryStatus");
auto sharedMsg = ToSharedI2NPMessage (msg);
if (msg->from && msg->from->GetTunnelPool ()) if (msg->from && msg->from->GetTunnelPool ())
msg->from->GetTunnelPool ()->ProcessDeliveryStatus (sharedMsg); msg->from->GetTunnelPool ()->ProcessDeliveryStatus (msg);
else else
i2p::context.ProcessDeliveryStatusMessage (sharedMsg); i2p::context.ProcessDeliveryStatusMessage (msg);
break; break;
} }
case eI2NPVariableTunnelBuild: case eI2NPVariableTunnelBuild:
@ -576,7 +573,6 @@ namespace i2p
break; break;
default: default:
HandleI2NPMessage (msg->GetBuffer (), msg->GetLength ()); HandleI2NPMessage (msg->GetBuffer (), msg->GetLength ());
DeleteI2NPMessage (msg);
} }
} }
} }
@ -586,20 +582,20 @@ namespace i2p
Flush (); Flush ();
} }
void I2NPMessagesHandler::PutNextMessage (I2NPMessage * msg) void I2NPMessagesHandler::PutNextMessage (I2NPMessage * msg)
{ {
if (msg) if (msg)
{ {
switch (msg->GetTypeID ()) switch (msg->GetTypeID ())
{ {
case eI2NPTunnelData: case eI2NPTunnelData:
m_TunnelMsgs.push_back (msg); m_TunnelMsgs.push_back (ToSharedI2NPMessage (msg));
break; break;
case eI2NPTunnelGateway: case eI2NPTunnelGateway:
m_TunnelGatewayMsgs.push_back (msg); m_TunnelGatewayMsgs.push_back (ToSharedI2NPMessage (msg));
break; break;
default: default:
HandleI2NPMessage (msg); HandleI2NPMessage (ToSharedI2NPMessage (msg));
} }
} }
} }

6
I2NPProtocol.h

@ -225,11 +225,11 @@ namespace tunnel
I2NPMessage * CreateTunnelGatewayMsg (uint32_t tunnelID, const uint8_t * buf, size_t len); I2NPMessage * CreateTunnelGatewayMsg (uint32_t tunnelID, const uint8_t * buf, size_t len);
I2NPMessage * CreateTunnelGatewayMsg (uint32_t tunnelID, I2NPMessageType msgType, I2NPMessage * CreateTunnelGatewayMsg (uint32_t tunnelID, I2NPMessageType msgType,
const uint8_t * buf, size_t len, uint32_t replyMsgID = 0); const uint8_t * buf, size_t len, uint32_t replyMsgID = 0);
I2NPMessage * CreateTunnelGatewayMsg (uint32_t tunnelID, I2NPMessage * msg); std::shared_ptr<I2NPMessage> CreateTunnelGatewayMsg (uint32_t tunnelID, std::shared_ptr<I2NPMessage> msg);
size_t GetI2NPMessageLength (const uint8_t * msg); size_t GetI2NPMessageLength (const uint8_t * msg);
void HandleI2NPMessage (uint8_t * msg, size_t len); void HandleI2NPMessage (uint8_t * msg, size_t len);
void HandleI2NPMessage (I2NPMessage * msg); void HandleI2NPMessage (std::shared_ptr<I2NPMessage> msg);
class I2NPMessagesHandler class I2NPMessagesHandler
{ {
@ -241,7 +241,7 @@ namespace tunnel
private: private:
std::vector<I2NPMessage *> m_TunnelMsgs, m_TunnelGatewayMsgs; std::vector<std::shared_ptr<I2NPMessage> > m_TunnelMsgs, m_TunnelGatewayMsgs;
}; };
} }

10
NetDb.cpp

@ -597,7 +597,7 @@ namespace data
{ {
i2p::tunnel::eDeliveryTypeRouter, i2p::tunnel::eDeliveryTypeRouter,
nextFloodfill->GetIdentHash (), 0, nextFloodfill->GetIdentHash (), 0,
CreateDatabaseStoreMsg () ToSharedI2NPMessage (CreateDatabaseStoreMsg ())
}); });
// request destination // request destination
@ -606,7 +606,7 @@ namespace data
msgs.push_back (i2p::tunnel::TunnelMessageBlock msgs.push_back (i2p::tunnel::TunnelMessageBlock
{ {
i2p::tunnel::eDeliveryTypeRouter, i2p::tunnel::eDeliveryTypeRouter,
nextFloodfill->GetIdentHash (), 0, msg nextFloodfill->GetIdentHash (), 0, ToSharedI2NPMessage (msg)
}); });
deleteDest = false; deleteDest = false;
} }
@ -763,7 +763,7 @@ namespace data
if (outbound) if (outbound)
outbound->SendTunnelDataMsg (buf+32, replyTunnelID, replyMsg); outbound->SendTunnelDataMsg (buf+32, replyTunnelID, replyMsg);
else else
transports.SendMessage (buf+32, i2p::CreateTunnelGatewayMsg (replyTunnelID, replyMsg)); transports.SendMessage (buf+32, i2p::CreateTunnelGatewayMsg (replyTunnelID, ToSharedI2NPMessage(replyMsg)));
} }
else else
transports.SendMessage (buf+32, replyMsg); transports.SendMessage (buf+32, replyMsg);
@ -804,13 +804,13 @@ namespace data
{ {
i2p::tunnel::eDeliveryTypeRouter, i2p::tunnel::eDeliveryTypeRouter,
floodfill->GetIdentHash (), 0, floodfill->GetIdentHash (), 0,
CreateDatabaseStoreMsg () // tell floodfill about us ToSharedI2NPMessage (CreateDatabaseStoreMsg ()) // tell floodfill about us
}); });
msgs.push_back (i2p::tunnel::TunnelMessageBlock msgs.push_back (i2p::tunnel::TunnelMessageBlock
{ {
i2p::tunnel::eDeliveryTypeRouter, i2p::tunnel::eDeliveryTypeRouter,
floodfill->GetIdentHash (), 0, floodfill->GetIdentHash (), 0,
dest->CreateRequestMessage (floodfill, inbound) // explore ToSharedI2NPMessage (dest->CreateRequestMessage (floodfill, inbound)) // explore
}); });
} }
else else

2
RouterContext.cpp

@ -296,7 +296,7 @@ namespace i2p
void RouterContext::HandleI2NPMessage (const uint8_t * buf, size_t len, std::shared_ptr<i2p::tunnel::InboundTunnel> from) void RouterContext::HandleI2NPMessage (const uint8_t * buf, size_t len, std::shared_ptr<i2p::tunnel::InboundTunnel> from)
{ {
i2p::HandleI2NPMessage (CreateI2NPMessage (buf, GetI2NPMessageLength (buf), from)); i2p::HandleI2NPMessage (ToSharedI2NPMessage(CreateI2NPMessage (buf, GetI2NPMessageLength (buf), from)));
} }
void RouterContext::ProcessGarlicMessage (std::shared_ptr<I2NPMessage> msg) void RouterContext::ProcessGarlicMessage (std::shared_ptr<I2NPMessage> msg)

2
Streaming.cpp

@ -617,7 +617,7 @@ namespace stream
{ {
i2p::tunnel::eDeliveryTypeTunnel, i2p::tunnel::eDeliveryTypeTunnel,
m_CurrentRemoteLease.tunnelGateway, m_CurrentRemoteLease.tunnelID, m_CurrentRemoteLease.tunnelGateway, m_CurrentRemoteLease.tunnelID,
msg ToSharedI2NPMessage (msg)
}); });
m_NumSentBytes += it->GetLength (); m_NumSentBytes += it->GetLength ();
} }

18
TransitTunnel.cpp

@ -20,7 +20,7 @@ namespace tunnel
m_Encryption.SetKeys (layerKey, ivKey); m_Encryption.SetKeys (layerKey, ivKey);
} }
void TransitTunnel::EncryptTunnelMsg (I2NPMessage * tunnelMsg) void TransitTunnel::EncryptTunnelMsg (std::shared_ptr<I2NPMessage> tunnelMsg)
{ {
m_Encryption.Encrypt (tunnelMsg->GetPayload () + 4); m_Encryption.Encrypt (tunnelMsg->GetPayload () + 4);
} }
@ -29,14 +29,14 @@ namespace tunnel
{ {
} }
void TransitTunnelParticipant::HandleTunnelDataMsg (i2p::I2NPMessage * tunnelMsg) void TransitTunnelParticipant::HandleTunnelDataMsg (std::shared_ptr<i2p::I2NPMessage> tunnelMsg)
{ {
EncryptTunnelMsg (tunnelMsg); EncryptTunnelMsg (tunnelMsg);
m_NumTransmittedBytes += tunnelMsg->GetLength (); m_NumTransmittedBytes += tunnelMsg->GetLength ();
htobe32buf (tunnelMsg->GetPayload (), GetNextTunnelID ()); htobe32buf (tunnelMsg->GetPayload (), GetNextTunnelID ());
FillI2NPMessageHeader (tunnelMsg, eI2NPTunnelData); FillI2NPMessageHeader (tunnelMsg.get (), eI2NPTunnelData); // TODO
m_TunnelDataMsgs.push_back (ToSharedI2NPMessage (tunnelMsg)); m_TunnelDataMsgs.push_back (tunnelMsg);
} }
void TransitTunnelParticipant::FlushTunnelDataMsgs () void TransitTunnelParticipant::FlushTunnelDataMsgs ()
@ -51,19 +51,17 @@ namespace tunnel
} }
} }
void TransitTunnel::SendTunnelDataMsg (i2p::I2NPMessage * msg) void TransitTunnel::SendTunnelDataMsg (std::shared_ptr<i2p::I2NPMessage> msg)
{ {
LogPrint (eLogError, "We are not a gateway for transit tunnel ", m_TunnelID); LogPrint (eLogError, "We are not a gateway for transit tunnel ", m_TunnelID);
i2p::DeleteI2NPMessage (msg);
} }
void TransitTunnel::HandleTunnelDataMsg (i2p::I2NPMessage * tunnelMsg) void TransitTunnel::HandleTunnelDataMsg (std::shared_ptr<i2p::I2NPMessage> tunnelMsg)
{ {
LogPrint (eLogError, "Incoming tunnel message is not supported ", m_TunnelID); LogPrint (eLogError, "Incoming tunnel message is not supported ", m_TunnelID);
DeleteI2NPMessage (tunnelMsg);
} }
void TransitTunnelGateway::SendTunnelDataMsg (i2p::I2NPMessage * msg) void TransitTunnelGateway::SendTunnelDataMsg (std::shared_ptr<i2p::I2NPMessage> msg)
{ {
TunnelMessageBlock block; TunnelMessageBlock block;
block.deliveryType = eDeliveryTypeLocal; block.deliveryType = eDeliveryTypeLocal;
@ -78,7 +76,7 @@ namespace tunnel
m_Gateway.SendBuffer (); m_Gateway.SendBuffer ();
} }
void TransitTunnelEndpoint::HandleTunnelDataMsg (i2p::I2NPMessage * tunnelMsg) void TransitTunnelEndpoint::HandleTunnelDataMsg (std::shared_ptr<i2p::I2NPMessage> tunnelMsg)
{ {
EncryptTunnelMsg (tunnelMsg); EncryptTunnelMsg (tunnelMsg);

12
TransitTunnel.h

@ -28,9 +28,9 @@ namespace tunnel
uint32_t GetTunnelID () const { return m_TunnelID; }; uint32_t GetTunnelID () const { return m_TunnelID; };
// implements TunnelBase // implements TunnelBase
void SendTunnelDataMsg (i2p::I2NPMessage * msg); void SendTunnelDataMsg (std::shared_ptr<i2p::I2NPMessage> msg);
void HandleTunnelDataMsg (i2p::I2NPMessage * tunnelMsg); void HandleTunnelDataMsg (std::shared_ptr<i2p::I2NPMessage> tunnelMsg);
void EncryptTunnelMsg (I2NPMessage * tunnelMsg); void EncryptTunnelMsg (std::shared_ptr<I2NPMessage> tunnelMsg);
uint32_t GetNextTunnelID () const { return m_NextTunnelID; }; uint32_t GetNextTunnelID () const { return m_NextTunnelID; };
const i2p::data::IdentHash& GetNextIdentHash () const { return m_NextIdent; }; const i2p::data::IdentHash& GetNextIdentHash () const { return m_NextIdent; };
@ -54,7 +54,7 @@ namespace tunnel
~TransitTunnelParticipant (); ~TransitTunnelParticipant ();
size_t GetNumTransmittedBytes () const { return m_NumTransmittedBytes; }; size_t GetNumTransmittedBytes () const { return m_NumTransmittedBytes; };
void HandleTunnelDataMsg (i2p::I2NPMessage * tunnelMsg); void HandleTunnelDataMsg (std::shared_ptr<i2p::I2NPMessage> tunnelMsg);
void FlushTunnelDataMsgs (); void FlushTunnelDataMsgs ();
private: private:
@ -73,7 +73,7 @@ namespace tunnel
TransitTunnel (receiveTunnelID, nextIdent, nextTunnelID, TransitTunnel (receiveTunnelID, nextIdent, nextTunnelID,
layerKey, ivKey), m_Gateway(this) {}; layerKey, ivKey), m_Gateway(this) {};
void SendTunnelDataMsg (i2p::I2NPMessage * msg); void SendTunnelDataMsg (std::shared_ptr<i2p::I2NPMessage> msg);
void FlushTunnelDataMsgs (); void FlushTunnelDataMsgs ();
size_t GetNumTransmittedBytes () const { return m_Gateway.GetNumSentBytes (); }; size_t GetNumTransmittedBytes () const { return m_Gateway.GetNumSentBytes (); };
@ -93,7 +93,7 @@ namespace tunnel
TransitTunnel (receiveTunnelID, nextIdent, nextTunnelID, layerKey, ivKey), TransitTunnel (receiveTunnelID, nextIdent, nextTunnelID, layerKey, ivKey),
m_Endpoint (false) {}; // transit endpoint is always outbound m_Endpoint (false) {}; // transit endpoint is always outbound
void HandleTunnelDataMsg (i2p::I2NPMessage * tunnelMsg); void HandleTunnelDataMsg (std::shared_ptr<i2p::I2NPMessage> tunnelMsg);
size_t GetNumTransmittedBytes () const { return m_Endpoint.GetNumReceivedBytes (); } size_t GetNumTransmittedBytes () const { return m_Endpoint.GetNumReceivedBytes (); }
private: private:

7
Transports.cpp

@ -242,12 +242,7 @@ namespace transport
{ {
// we send it to ourself // we send it to ourself
for (auto it: msgs) for (auto it: msgs)
{ i2p::HandleI2NPMessage (it);
// TODO:
auto m = NewI2NPMessage ();
*m = *(it);
i2p::HandleI2NPMessage (m);
}
return; return;
} }
auto it = m_Peers.find (ident); auto it = m_Peers.find (ident);

36
Tunnel.cpp

@ -31,7 +31,7 @@ namespace tunnel
CryptoPP::RandomNumberGenerator& rnd = i2p::context.GetRandomNumberGenerator (); CryptoPP::RandomNumberGenerator& rnd = i2p::context.GetRandomNumberGenerator ();
auto numHops = m_Config->GetNumHops (); auto numHops = m_Config->GetNumHops ();
int numRecords = numHops <= STANDARD_NUM_RECORDS ? STANDARD_NUM_RECORDS : numHops; int numRecords = numHops <= STANDARD_NUM_RECORDS ? STANDARD_NUM_RECORDS : numHops;
I2NPMessage * msg = NewI2NPShortMessage (); auto msg = NewI2NPShortMessage ();
*msg->GetPayload () = numRecords; *msg->GetPayload () = numRecords;
msg->len += numRecords*TUNNEL_BUILD_RECORD_SIZE + 1; msg->len += numRecords*TUNNEL_BUILD_RECORD_SIZE + 1;
@ -140,7 +140,7 @@ namespace tunnel
return established; return established;
} }
void Tunnel::EncryptTunnelMsg (I2NPMessage * tunnelMsg) void Tunnel::EncryptTunnelMsg (std::shared_ptr<I2NPMessage> tunnelMsg)
{ {
uint8_t * payload = tunnelMsg->GetPayload () + 4; uint8_t * payload = tunnelMsg->GetPayload () + 4;
TunnelHopConfig * hop = m_Config->GetLastHop (); TunnelHopConfig * hop = m_Config->GetLastHop ();
@ -151,13 +151,12 @@ namespace tunnel
} }
} }
void Tunnel::SendTunnelDataMsg (i2p::I2NPMessage * msg) void Tunnel::SendTunnelDataMsg (std::shared_ptr<i2p::I2NPMessage> msg)
{ {
LogPrint (eLogInfo, "Can't send I2NP messages without delivery instructions"); LogPrint (eLogInfo, "Can't send I2NP messages without delivery instructions");
DeleteI2NPMessage (msg);
} }
void InboundTunnel::HandleTunnelDataMsg (I2NPMessage * msg) void InboundTunnel::HandleTunnelDataMsg (std::shared_ptr<I2NPMessage> msg)
{ {
if (IsFailed ()) SetState (eTunnelStateEstablished); // incoming messages means a tunnel is alive if (IsFailed ()) SetState (eTunnelStateEstablished); // incoming messages means a tunnel is alive
msg->from = shared_from_this (); msg->from = shared_from_this ();
@ -181,7 +180,7 @@ namespace tunnel
} }
else else
block.deliveryType = eDeliveryTypeLocal; block.deliveryType = eDeliveryTypeLocal;
block.data = msg; block.data = ToSharedI2NPMessage (msg);
std::unique_lock<std::mutex> l(m_SendMutex); std::unique_lock<std::mutex> l(m_SendMutex);
m_Gateway.SendTunnelDataMsg (block); m_Gateway.SendTunnelDataMsg (block);
@ -195,10 +194,9 @@ namespace tunnel
m_Gateway.SendBuffer (); m_Gateway.SendBuffer ();
} }
void OutboundTunnel::HandleTunnelDataMsg (i2p::I2NPMessage * tunnelMsg) void OutboundTunnel::HandleTunnelDataMsg (std::shared_ptr<i2p::I2NPMessage> tunnelMsg)
{ {
LogPrint (eLogError, "Incoming message for outbound tunnel ", GetTunnelID ()); LogPrint (eLogError, "Incoming message for outbound tunnel ", GetTunnelID ());
DeleteI2NPMessage (tunnelMsg);
} }
Tunnels tunnels; Tunnels tunnels;
@ -352,7 +350,7 @@ namespace tunnel
{ {
try try
{ {
I2NPMessage * msg = m_Queue.GetNextWithTimeout (1000); // 1 sec auto msg = m_Queue.GetNextWithTimeout (1000); // 1 sec
if (msg) if (msg)
{ {
uint32_t prevTunnelID = 0, tunnelID = 0; uint32_t prevTunnelID = 0, tunnelID = 0;
@ -383,27 +381,18 @@ namespace tunnel
else // tunnel gateway assumed else // tunnel gateway assumed
HandleTunnelGatewayMsg (tunnel, msg); HandleTunnelGatewayMsg (tunnel, msg);
} }
else else
{
LogPrint (eLogWarning, "Tunnel ", tunnelID, " not found"); LogPrint (eLogWarning, "Tunnel ", tunnelID, " not found");
DeleteI2NPMessage (msg);
}
break; break;
} }
case eI2NPVariableTunnelBuild: case eI2NPVariableTunnelBuild:
case eI2NPVariableTunnelBuildReply: case eI2NPVariableTunnelBuildReply:
case eI2NPTunnelBuild: case eI2NPTunnelBuild:
case eI2NPTunnelBuildReply: case eI2NPTunnelBuildReply:
{
HandleI2NPMessage (msg->GetBuffer (), msg->GetLength ()); HandleI2NPMessage (msg->GetBuffer (), msg->GetLength ());
DeleteI2NPMessage (msg); break;
break;
}
default: default:
{
LogPrint (eLogError, "Unexpected messsage type ", (int)typeID); LogPrint (eLogError, "Unexpected messsage type ", (int)typeID);
DeleteI2NPMessage (msg);
}
} }
msg = m_Queue.Get (); msg = m_Queue.Get ();
@ -432,12 +421,11 @@ namespace tunnel
} }
} }
void Tunnels::HandleTunnelGatewayMsg (TunnelBase * tunnel, I2NPMessage * msg) void Tunnels::HandleTunnelGatewayMsg (TunnelBase * tunnel, std::shared_ptr<I2NPMessage> msg)
{ {
if (!tunnel) if (!tunnel)
{ {
LogPrint (eLogError, "Missing tunnel for TunnelGateway"); LogPrint (eLogError, "Missing tunnel for TunnelGateway");
i2p::DeleteI2NPMessage (msg);
return; return;
} }
const uint8_t * payload = msg->GetPayload (); const uint8_t * payload = msg->GetPayload ();
@ -661,12 +649,12 @@ namespace tunnel
} }
} }
void Tunnels::PostTunnelData (I2NPMessage * msg) void Tunnels::PostTunnelData (std::shared_ptr<I2NPMessage> msg)
{ {
if (msg) m_Queue.Put (msg); if (msg) m_Queue.Put (msg);
} }
void Tunnels::PostTunnelData (const std::vector<I2NPMessage *>& msgs) void Tunnels::PostTunnelData (const std::vector<std::shared_ptr<I2NPMessage> >& msgs)
{ {
m_Queue.Put (msgs); m_Queue.Put (msgs);
} }

16
Tunnel.h

@ -64,8 +64,8 @@ namespace tunnel
bool HandleTunnelBuildResponse (uint8_t * msg, size_t len); bool HandleTunnelBuildResponse (uint8_t * msg, size_t len);
// implements TunnelBase // implements TunnelBase
void SendTunnelDataMsg (i2p::I2NPMessage * msg); void SendTunnelDataMsg (std::shared_ptr<i2p::I2NPMessage> msg);
void EncryptTunnelMsg (I2NPMessage * tunnelMsg); void EncryptTunnelMsg (std::shared_ptr<I2NPMessage> tunnelMsg);
uint32_t GetNextTunnelID () const { return m_Config->GetFirstHop ()->tunnelID; }; uint32_t GetNextTunnelID () const { return m_Config->GetFirstHop ()->tunnelID; };
const i2p::data::IdentHash& GetNextIdentHash () const { return m_Config->GetFirstHop ()->router->GetIdentHash (); }; const i2p::data::IdentHash& GetNextIdentHash () const { return m_Config->GetFirstHop ()->router->GetIdentHash (); };
@ -90,7 +90,7 @@ namespace tunnel
size_t GetNumSentBytes () const { return m_Gateway.GetNumSentBytes (); }; size_t GetNumSentBytes () const { return m_Gateway.GetNumSentBytes (); };
// implements TunnelBase // implements TunnelBase
void HandleTunnelDataMsg (i2p::I2NPMessage * tunnelMsg); void HandleTunnelDataMsg (std::shared_ptr<i2p::I2NPMessage> tunnelMsg);
uint32_t GetTunnelID () const { return GetNextTunnelID (); }; uint32_t GetTunnelID () const { return GetNextTunnelID (); };
private: private:
@ -104,7 +104,7 @@ namespace tunnel
public: public:
InboundTunnel (std::shared_ptr<const TunnelConfig> config): Tunnel (config), m_Endpoint (true) {}; InboundTunnel (std::shared_ptr<const TunnelConfig> config): Tunnel (config), m_Endpoint (true) {};
void HandleTunnelDataMsg (I2NPMessage * msg); void HandleTunnelDataMsg (std::shared_ptr<I2NPMessage> msg);
size_t GetNumReceivedBytes () const { return m_Endpoint.GetNumReceivedBytes (); }; size_t GetNumReceivedBytes () const { return m_Endpoint.GetNumReceivedBytes (); };
// implements TunnelBase // implements TunnelBase
@ -135,8 +135,8 @@ namespace tunnel
void AddTransitTunnel (TransitTunnel * tunnel); void AddTransitTunnel (TransitTunnel * tunnel);
void AddOutboundTunnel (std::shared_ptr<OutboundTunnel> newTunnel); void AddOutboundTunnel (std::shared_ptr<OutboundTunnel> newTunnel);
void AddInboundTunnel (std::shared_ptr<InboundTunnel> newTunnel); void AddInboundTunnel (std::shared_ptr<InboundTunnel> newTunnel);
void PostTunnelData (I2NPMessage * msg); void PostTunnelData (std::shared_ptr<I2NPMessage> msg);
void PostTunnelData (const std::vector<I2NPMessage *>& msgs); void PostTunnelData (const std::vector<std::shared_ptr<I2NPMessage> >& msgs);
template<class TTunnel> template<class TTunnel>
std::shared_ptr<TTunnel> CreateTunnel (std::shared_ptr<TunnelConfig> config, std::shared_ptr<OutboundTunnel> outboundTunnel = nullptr); std::shared_ptr<TTunnel> CreateTunnel (std::shared_ptr<TunnelConfig> config, std::shared_ptr<OutboundTunnel> outboundTunnel = nullptr);
void AddPendingTunnel (uint32_t replyMsgID, std::shared_ptr<InboundTunnel> tunnel); void AddPendingTunnel (uint32_t replyMsgID, std::shared_ptr<InboundTunnel> tunnel);
@ -150,7 +150,7 @@ namespace tunnel
template<class TTunnel> template<class TTunnel>
std::shared_ptr<TTunnel> GetPendingTunnel (uint32_t replyMsgID, const std::map<uint32_t, std::shared_ptr<TTunnel> >& pendingTunnels); std::shared_ptr<TTunnel> GetPendingTunnel (uint32_t replyMsgID, const std::map<uint32_t, std::shared_ptr<TTunnel> >& pendingTunnels);
void HandleTunnelGatewayMsg (TunnelBase * tunnel, I2NPMessage * msg); void HandleTunnelGatewayMsg (TunnelBase * tunnel, std::shared_ptr<I2NPMessage> msg);
void Run (); void Run ();
void ManageTunnels (); void ManageTunnels ();
@ -177,7 +177,7 @@ namespace tunnel
std::mutex m_PoolsMutex; std::mutex m_PoolsMutex;
std::list<std::shared_ptr<TunnelPool>> m_Pools; std::list<std::shared_ptr<TunnelPool>> m_Pools;
std::shared_ptr<TunnelPool> m_ExploratoryPool; std::shared_ptr<TunnelPool> m_ExploratoryPool;
i2p::util::Queue<I2NPMessage *> m_Queue; i2p::util::Queue<std::shared_ptr<I2NPMessage> > m_Queue;
// some stats // some stats
int m_NumSuccesiveTunnelCreations, m_NumFailedTunnelCreations; int m_NumSuccesiveTunnelCreations, m_NumFailedTunnelCreations;

8
TunnelBase.h

@ -26,7 +26,7 @@ namespace tunnel
TunnelDeliveryType deliveryType; TunnelDeliveryType deliveryType;
i2p::data::IdentHash hash; i2p::data::IdentHash hash;
uint32_t tunnelID; uint32_t tunnelID;
I2NPMessage * data; std::shared_ptr<I2NPMessage> data;
}; };
class TunnelBase class TunnelBase
@ -37,10 +37,10 @@ namespace tunnel
TunnelBase (): m_CreationTime (i2p::util::GetSecondsSinceEpoch ()) {}; TunnelBase (): m_CreationTime (i2p::util::GetSecondsSinceEpoch ()) {};
virtual ~TunnelBase () {}; virtual ~TunnelBase () {};
virtual void HandleTunnelDataMsg (i2p::I2NPMessage * tunnelMsg) = 0; virtual void HandleTunnelDataMsg (std::shared_ptr<i2p::I2NPMessage> tunnelMsg) = 0;
virtual void SendTunnelDataMsg (i2p::I2NPMessage * msg) = 0; virtual void SendTunnelDataMsg (std::shared_ptr<i2p::I2NPMessage> msg) = 0;
virtual void FlushTunnelDataMsgs () {}; virtual void FlushTunnelDataMsgs () {};
virtual void EncryptTunnelMsg (I2NPMessage * tunnelMsg) = 0; virtual void EncryptTunnelMsg (std::shared_ptr<I2NPMessage> tunnelMsg) = 0;
virtual uint32_t GetNextTunnelID () const = 0; virtual uint32_t GetNextTunnelID () const = 0;
virtual const i2p::data::IdentHash& GetNextIdentHash () const = 0; virtual const i2p::data::IdentHash& GetNextIdentHash () const = 0;
virtual uint32_t GetTunnelID () const = 0; // as known at our side virtual uint32_t GetTunnelID () const = 0; // as known at our side

37
TunnelEndpoint.cpp

@ -13,13 +13,9 @@ namespace tunnel
{ {
TunnelEndpoint::~TunnelEndpoint () TunnelEndpoint::~TunnelEndpoint ()
{ {
for (auto it: m_IncompleteMessages)
i2p::DeleteI2NPMessage (it.second.data);
for (auto it: m_OutOfSequenceFragments)
i2p::DeleteI2NPMessage (it.second.data);
} }
void TunnelEndpoint::HandleDecryptedTunnelDataMsg (I2NPMessage * msg) void TunnelEndpoint::HandleDecryptedTunnelDataMsg (std::shared_ptr<I2NPMessage> msg)
{ {
m_NumReceivedBytes += TUNNEL_DATA_MSG_SIZE; m_NumReceivedBytes += TUNNEL_DATA_MSG_SIZE;
@ -35,7 +31,6 @@ namespace tunnel
if (memcmp (hash, decrypted, 4)) if (memcmp (hash, decrypted, 4))
{ {
LogPrint (eLogError, "TunnelMessage: checksum verification failed"); LogPrint (eLogError, "TunnelMessage: checksum verification failed");
i2p::DeleteI2NPMessage (msg);
return; return;
} }
// process fragments // process fragments
@ -97,7 +92,7 @@ namespace tunnel
if (fragment + size < decrypted + TUNNEL_DATA_ENCRYPTED_SIZE) if (fragment + size < decrypted + TUNNEL_DATA_ENCRYPTED_SIZE)
{ {
// this is not last message. we have to copy it // this is not last message. we have to copy it
m.data = NewI2NPShortMessage (); m.data = ToSharedI2NPMessage (NewI2NPShortMessage ());
m.data->offset += TUNNEL_GATEWAY_HEADER_SIZE; // reserve room for TunnelGateway header m.data->offset += TUNNEL_GATEWAY_HEADER_SIZE; // reserve room for TunnelGateway header
m.data->len += TUNNEL_GATEWAY_HEADER_SIZE; m.data->len += TUNNEL_GATEWAY_HEADER_SIZE;
*(m.data) = *msg; *(m.data) = *msg;
@ -118,10 +113,7 @@ namespace tunnel
if (ret.second) if (ret.second)
HandleOutOfSequenceFragment (msgID, ret.first->second); HandleOutOfSequenceFragment (msgID, ret.first->second);
else else
{
LogPrint (eLogError, "Incomplete message ", msgID, "already exists"); LogPrint (eLogError, "Incomplete message ", msgID, "already exists");
DeleteI2NPMessage (m.data);
}
} }
else else
{ {
@ -130,20 +122,14 @@ namespace tunnel
} }
} }
else else
{
LogPrint (eLogError, "Message is fragmented, but msgID is not presented"); LogPrint (eLogError, "Message is fragmented, but msgID is not presented");
DeleteI2NPMessage (m.data);
}
} }
fragment += size; fragment += size;
} }
} }
else else
{
LogPrint (eLogError, "TunnelMessage: zero not found"); LogPrint (eLogError, "TunnelMessage: zero not found");
i2p::DeleteI2NPMessage (msg);
}
} }
void TunnelEndpoint::HandleFollowOnFragment (uint32_t msgID, bool isLastFragment, const TunnelMessageBlockEx& m) void TunnelEndpoint::HandleFollowOnFragment (uint32_t msgID, bool isLastFragment, const TunnelMessageBlockEx& m)
@ -161,9 +147,8 @@ namespace tunnel
if (msg.data->len + size > msg.data->maxLen) if (msg.data->len + size > msg.data->maxLen)
{ {
LogPrint (eLogInfo, "Tunnel endpoint I2NP message size ", msg.data->maxLen, " is not enough"); LogPrint (eLogInfo, "Tunnel endpoint I2NP message size ", msg.data->maxLen, " is not enough");
I2NPMessage * newMsg = NewI2NPMessage (); auto newMsg = ToSharedI2NPMessage (NewI2NPMessage ());
*newMsg = *(msg.data); *newMsg = *(msg.data);
DeleteI2NPMessage (msg.data);
msg.data = newMsg; msg.data = newMsg;
} }
memcpy (msg.data->buf + msg.data->len, fragment, size); // concatenate fragment memcpy (msg.data->buf + msg.data->len, fragment, size); // concatenate fragment
@ -183,10 +168,8 @@ namespace tunnel
else else
{ {
LogPrint (eLogError, "Fragment ", m.nextFragmentNum, " of message ", msgID, "exceeds max I2NP message size. Message dropped"); LogPrint (eLogError, "Fragment ", m.nextFragmentNum, " of message ", msgID, "exceeds max I2NP message size. Message dropped");
i2p::DeleteI2NPMessage (msg.data);
m_IncompleteMessages.erase (it); m_IncompleteMessages.erase (it);
} }
i2p::DeleteI2NPMessage (m.data);
} }
else else
{ {
@ -201,13 +184,11 @@ namespace tunnel
} }
} }
void TunnelEndpoint::AddOutOfSequenceFragment (uint32_t msgID, uint8_t fragmentNum, bool isLastFragment, I2NPMessage * data) void TunnelEndpoint::AddOutOfSequenceFragment (uint32_t msgID, uint8_t fragmentNum, bool isLastFragment, std::shared_ptr<I2NPMessage> data)
{ {
auto it = m_OutOfSequenceFragments.find (msgID); auto it = m_OutOfSequenceFragments.find (msgID);
if (it == m_OutOfSequenceFragments.end ()) if (it == m_OutOfSequenceFragments.end ())
m_OutOfSequenceFragments.insert (std::pair<uint32_t, Fragment> (msgID, {fragmentNum, isLastFragment, data})); m_OutOfSequenceFragments.insert (std::pair<uint32_t, Fragment> (msgID, {fragmentNum, isLastFragment, data}));
else
i2p::DeleteI2NPMessage (data);
} }
void TunnelEndpoint::HandleOutOfSequenceFragment (uint32_t msgID, TunnelMessageBlockEx& msg) void TunnelEndpoint::HandleOutOfSequenceFragment (uint32_t msgID, TunnelMessageBlockEx& msg)
@ -222,9 +203,8 @@ namespace tunnel
if (msg.data->len + size > msg.data->maxLen) if (msg.data->len + size > msg.data->maxLen)
{ {
LogPrint (eLogInfo, "Tunnel endpoint I2NP message size ", msg.data->maxLen, " is not enough"); LogPrint (eLogInfo, "Tunnel endpoint I2NP message size ", msg.data->maxLen, " is not enough");
I2NPMessage * newMsg = NewI2NPMessage (); auto newMsg = ToSharedI2NPMessage (NewI2NPMessage ());
*newMsg = *(msg.data); *newMsg = *(msg.data);
DeleteI2NPMessage (msg.data);
msg.data = newMsg; msg.data = newMsg;
} }
memcpy (msg.data->buf + msg.data->len, it->second.data->GetBuffer (), size); // concatenate out-of-sync fragment memcpy (msg.data->buf + msg.data->len, it->second.data->GetBuffer (), size); // concatenate out-of-sync fragment
@ -237,7 +217,6 @@ namespace tunnel
} }
else else
msg.nextFragmentNum++; msg.nextFragmentNum++;
i2p::DeleteI2NPMessage (it->second.data);
m_OutOfSequenceFragments.erase (it); m_OutOfSequenceFragments.erase (it);
} }
} }
@ -273,17 +252,11 @@ namespace tunnel
i2p::transport::transports.SendMessage (msg.hash, msg.data); i2p::transport::transports.SendMessage (msg.hash, msg.data);
} }
else // we shouldn't send this message. possible leakage else // we shouldn't send this message. possible leakage
{
LogPrint (eLogError, "Message to another router arrived from an inbound tunnel. Dropped"); LogPrint (eLogError, "Message to another router arrived from an inbound tunnel. Dropped");
i2p::DeleteI2NPMessage (msg.data);
}
} }
break; break;
default: default:
{
LogPrint (eLogError, "TunnelMessage: Unknown delivery type ", (int)msg.deliveryType); LogPrint (eLogError, "TunnelMessage: Unknown delivery type ", (int)msg.deliveryType);
i2p::DeleteI2NPMessage (msg.data);
}
}; };
} }
} }

6
TunnelEndpoint.h

@ -22,7 +22,7 @@ namespace tunnel
{ {
uint8_t fragmentNum; uint8_t fragmentNum;
bool isLastFragment; bool isLastFragment;
I2NPMessage * data; std::shared_ptr<I2NPMessage> data;
}; };
public: public:
@ -31,14 +31,14 @@ namespace tunnel
~TunnelEndpoint (); ~TunnelEndpoint ();
size_t GetNumReceivedBytes () const { return m_NumReceivedBytes; }; size_t GetNumReceivedBytes () const { return m_NumReceivedBytes; };
void HandleDecryptedTunnelDataMsg (I2NPMessage * msg); void HandleDecryptedTunnelDataMsg (std::shared_ptr<I2NPMessage> msg);
private: private:
void HandleFollowOnFragment (uint32_t msgID, bool isLastFragment, const TunnelMessageBlockEx& m); void HandleFollowOnFragment (uint32_t msgID, bool isLastFragment, const TunnelMessageBlockEx& m);
void HandleNextMessage (const TunnelMessageBlock& msg); void HandleNextMessage (const TunnelMessageBlock& msg);
void AddOutOfSequenceFragment (uint32_t msgID, uint8_t fragmentNum, bool isLastFragment, I2NPMessage * data); void AddOutOfSequenceFragment (uint32_t msgID, uint8_t fragmentNum, bool isLastFragment, std::shared_ptr<I2NPMessage> data);
void HandleOutOfSequenceFragment (uint32_t msgID, TunnelMessageBlockEx& msg); void HandleOutOfSequenceFragment (uint32_t msgID, TunnelMessageBlockEx& msg);
private: private:

6
TunnelGateway.cpp

@ -40,7 +40,7 @@ namespace tunnel
di[0] = block.deliveryType << 5; // set delivery type di[0] = block.deliveryType << 5; // set delivery type
// create fragments // create fragments
I2NPMessage * msg = block.data; std::shared_ptr<I2NPMessage> msg = block.data;
auto fullMsgLen = diLen + msg->GetLength () + 2; // delivery instructions + payload + 2 bytes length auto fullMsgLen = diLen + msg->GetLength () + 2; // delivery instructions + payload + 2 bytes length
if (fullMsgLen <= m_RemainingSize) if (fullMsgLen <= m_RemainingSize)
{ {
@ -53,7 +53,6 @@ namespace tunnel
m_RemainingSize -= diLen + msg->GetLength (); m_RemainingSize -= diLen + msg->GetLength ();
if (!m_RemainingSize) if (!m_RemainingSize)
CompleteCurrentTunnelDataMessage (); CompleteCurrentTunnelDataMessage ();
DeleteI2NPMessage (msg);
} }
else else
{ {
@ -117,7 +116,6 @@ namespace tunnel
size += s; size += s;
fragmentNumber++; fragmentNumber++;
} }
DeleteI2NPMessage (msg);
} }
else else
{ {
@ -190,7 +188,7 @@ namespace tunnel
auto tunnelMsgs = m_Buffer.GetTunnelDataMsgs (); auto tunnelMsgs = m_Buffer.GetTunnelDataMsgs ();
for (auto tunnelMsg : tunnelMsgs) for (auto tunnelMsg : tunnelMsgs)
{ {
m_Tunnel->EncryptTunnelMsg (tunnelMsg.get ()); // TODO: m_Tunnel->EncryptTunnelMsg (tunnelMsg);
FillI2NPMessageHeader (tunnelMsg.get (), eI2NPTunnelData); // TODO: FillI2NPMessageHeader (tunnelMsg.get (), eI2NPTunnelData); // TODO:
m_NumSentBytes += TUNNEL_DATA_MSG_SIZE; m_NumSentBytes += TUNNEL_DATA_MSG_SIZE;
} }

Loading…
Cancel
Save