Browse Source

use memory poll for streaming

pull/777/head
orignal 8 years ago
parent
commit
7ea0249e6e
  1. 34
      Streaming.cpp
  2. 6
      Streaming.h
  3. 3
      util.h

34
Streaming.cpp

@ -40,15 +40,15 @@ namespace stream
{ {
auto packet = m_ReceiveQueue.front (); auto packet = m_ReceiveQueue.front ();
m_ReceiveQueue.pop (); m_ReceiveQueue.pop ();
delete packet; m_LocalDestination.DeletePacket (packet);
} }
for (auto it: m_SentPackets) for (auto it: m_SentPackets)
delete it; m_LocalDestination.DeletePacket (it);
m_SentPackets.clear (); m_SentPackets.clear ();
for (auto it: m_SavedPackets) for (auto it: m_SavedPackets)
delete it; m_LocalDestination.DeletePacket (it);
m_SavedPackets.clear (); m_SavedPackets.clear ();
LogPrint (eLogDebug, "Streaming: Stream deleted"); LogPrint (eLogDebug, "Streaming: Stream deleted");
@ -83,7 +83,7 @@ namespace stream
{ {
// plain ack // plain ack
LogPrint (eLogDebug, "Streaming: Plain ACK received"); LogPrint (eLogDebug, "Streaming: Plain ACK received");
delete packet; m_LocalDestination.DeletePacket (packet);
return; return;
} }
@ -131,7 +131,7 @@ namespace stream
// we have received duplicate // we have received duplicate
LogPrint (eLogWarning, "Streaming: Duplicate message ", receivedSeqn, " on sSID=", m_SendStreamID); LogPrint (eLogWarning, "Streaming: Duplicate message ", receivedSeqn, " on sSID=", m_SendStreamID);
SendQuickAck (); // resend ack for previous message again SendQuickAck (); // resend ack for previous message again
delete packet; // packet dropped m_LocalDestination.DeletePacket (packet); // packet dropped
} }
else else
{ {
@ -163,7 +163,7 @@ namespace stream
void Stream::SavePacket (Packet * packet) void Stream::SavePacket (Packet * packet)
{ {
if (!m_SavedPackets.insert (packet).second) if (!m_SavedPackets.insert (packet).second)
delete packet; m_LocalDestination.DeletePacket (packet);
} }
void Stream::ProcessPacket (Packet * packet) void Stream::ProcessPacket (Packet * packet)
@ -216,7 +216,7 @@ namespace stream
m_ReceiveTimer.cancel (); m_ReceiveTimer.cancel ();
} }
else else
delete packet; m_LocalDestination.DeletePacket (packet);
m_LastReceivedSequenceNumber = receivedSeqn; m_LastReceivedSequenceNumber = receivedSeqn;
@ -278,7 +278,7 @@ namespace stream
m_RTO = m_RTT*1.5; // TODO: implement it better m_RTO = m_RTT*1.5; // TODO: implement it better
LogPrint (eLogDebug, "Streaming: Packet ", seqn, " acknowledged rtt=", rtt, " sentTime=", sentPacket->sendTime); LogPrint (eLogDebug, "Streaming: Packet ", seqn, " acknowledged rtt=", rtt, " sentTime=", sentPacket->sendTime);
m_SentPackets.erase (it++); m_SentPackets.erase (it++);
delete sentPacket; m_LocalDestination.DeletePacket (sentPacket);
acknowledged = true; acknowledged = true;
if (m_WindowSize < WINDOW_SIZE) if (m_WindowSize < WINDOW_SIZE)
m_WindowSize++; // slow start m_WindowSize++; // slow start
@ -345,7 +345,7 @@ namespace stream
std::unique_lock<std::mutex> l(m_SendBufferMutex); std::unique_lock<std::mutex> l(m_SendBufferMutex);
while ((m_Status == eStreamStatusNew) || (IsEstablished () && !m_SendBuffer.eof () && numMsgs > 0)) while ((m_Status == eStreamStatusNew) || (IsEstablished () && !m_SendBuffer.eof () && numMsgs > 0))
{ {
Packet * p = new Packet (); Packet * p = m_LocalDestination.NewPacket ();
uint8_t * packet = p->GetBuffer (); uint8_t * packet = p->GetBuffer ();
// TODO: implement setters // TODO: implement setters
size_t size = 0; size_t size = 0;
@ -532,7 +532,7 @@ namespace stream
void Stream::SendClose () void Stream::SendClose ()
{ {
Packet * p = new Packet (); Packet * p = m_LocalDestination.NewPacket ();
uint8_t * packet = p->GetBuffer (); uint8_t * packet = p->GetBuffer ();
size_t size = 0; size_t size = 0;
htobe32buf (packet + size, m_SendStreamID); htobe32buf (packet + size, m_SendStreamID);
@ -574,7 +574,7 @@ namespace stream
if (!packet->GetLength ()) if (!packet->GetLength ())
{ {
m_ReceiveQueue.pop (); m_ReceiveQueue.pop ();
delete packet; m_LocalDestination.DeletePacket (packet);
} }
} }
return pos; return pos;
@ -852,7 +852,7 @@ namespace stream
{ {
for (auto& it: m_SavedPackets) for (auto& it: m_SavedPackets)
{ {
for (auto it1: it.second) delete it1; for (auto it1: it.second) DeletePacket (it1);
it.second.clear (); it.second.clear ();
} }
m_SavedPackets.clear (); m_SavedPackets.clear ();
@ -889,7 +889,7 @@ namespace stream
else else
{ {
LogPrint (eLogError, "Streaming: Unknown stream sSID=", sendStreamID); LogPrint (eLogError, "Streaming: Unknown stream sSID=", sendStreamID);
delete packet; DeletePacket (packet);
} }
} }
else else
@ -901,7 +901,7 @@ namespace stream
{ {
// already pending // already pending
LogPrint(eLogWarning, "Streaming: Incoming streaming with rSID=", receiveStreamID, " already exists"); LogPrint(eLogWarning, "Streaming: Incoming streaming with rSID=", receiveStreamID, " already exists");
delete packet; // drop it, because previous should be connected DeletePacket (packet); // drop it, because previous should be connected
return; return;
} }
auto incomingStream = CreateNewIncomingStream (); auto incomingStream = CreateNewIncomingStream ();
@ -980,7 +980,7 @@ namespace stream
auto it = s->m_SavedPackets.find (receiveStreamID); auto it = s->m_SavedPackets.find (receiveStreamID);
if (it != s->m_SavedPackets.end ()) if (it != s->m_SavedPackets.end ())
{ {
for (auto it1: it->second) delete it1; for (auto it1: it->second) s->DeletePacket (it1);
it->second.clear (); it->second.clear ();
s->m_SavedPackets.erase (it); s->m_SavedPackets.erase (it);
} }
@ -1074,13 +1074,13 @@ namespace stream
void StreamingDestination::HandleDataMessagePayload (const uint8_t * buf, size_t len) void StreamingDestination::HandleDataMessagePayload (const uint8_t * buf, size_t len)
{ {
// unzip it // unzip it
Packet * uncompressed = new Packet; Packet * uncompressed = NewPacket ();
uncompressed->offset = 0; uncompressed->offset = 0;
uncompressed->len = m_Inflator.Inflate (buf, len, uncompressed->buf, MAX_PACKET_SIZE); uncompressed->len = m_Inflator.Inflate (buf, len, uncompressed->buf, MAX_PACKET_SIZE);
if (uncompressed->len) if (uncompressed->len)
HandleNextPacket (uncompressed); HandleNextPacket (uncompressed);
else else
delete uncompressed; DeletePacket (uncompressed);
} }
std::shared_ptr<I2NPMessage> StreamingDestination::CreateDataMessage (const uint8_t * payload, size_t len, uint16_t toPort) std::shared_ptr<I2NPMessage> StreamingDestination::CreateDataMessage (const uint8_t * payload, size_t len, uint16_t toPort)

6
Streaming.h

@ -18,6 +18,7 @@
#include "I2NPProtocol.h" #include "I2NPProtocol.h"
#include "Garlic.h" #include "Garlic.h"
#include "Tunnel.h" #include "Tunnel.h"
#include "util.h" // MemoryPool
namespace i2p namespace i2p
{ {
@ -235,6 +236,9 @@ namespace stream
/** set max connections per minute per destination */ /** set max connections per minute per destination */
void SetMaxConnsPerMinute(const uint32_t conns); void SetMaxConnsPerMinute(const uint32_t conns);
Packet * NewPacket () { return m_PacketsPool.Acquire (); };
void DeletePacket (Packet * p) { if (p) m_PacketsPool.Release (p); };
private: private:
void HandleNextPacket (Packet * packet); void HandleNextPacket (Packet * packet);
@ -270,6 +274,8 @@ namespace stream
std::vector<i2p::data::IdentHash> m_Banned; std::vector<i2p::data::IdentHash> m_Banned;
uint64_t m_LastBanClear; uint64_t m_LastBanClear;
i2p::util::MemoryPool<Packet> m_PacketsPool;
public: public:
i2p::data::GzipInflator m_Inflator; i2p::data::GzipInflator m_Inflator;

3
util.h

@ -58,8 +58,9 @@ namespace util
void Release (T * t) void Release (T * t)
{ {
if (!t) return;
t->~T (); t->~T ();
*(void * *)t = m_Head; *(void * *)t = m_Head; // next
m_Head = t; m_Head = t;
} }

Loading…
Cancel
Save