From 7ea0249e6effc7ea5814883a11ad6d3c6773f59c Mon Sep 17 00:00:00 2001 From: orignal Date: Tue, 10 Jan 2017 21:31:52 -0500 Subject: [PATCH] use memory poll for streaming --- Streaming.cpp | 34 +++++++++++++++++----------------- Streaming.h | 8 +++++++- util.h | 3 ++- 3 files changed, 26 insertions(+), 19 deletions(-) diff --git a/Streaming.cpp b/Streaming.cpp index ff14aadb..912e21e5 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -40,15 +40,15 @@ namespace stream { auto packet = m_ReceiveQueue.front (); m_ReceiveQueue.pop (); - delete packet; + m_LocalDestination.DeletePacket (packet); } for (auto it: m_SentPackets) - delete it; + m_LocalDestination.DeletePacket (it); m_SentPackets.clear (); for (auto it: m_SavedPackets) - delete it; + m_LocalDestination.DeletePacket (it); m_SavedPackets.clear (); LogPrint (eLogDebug, "Streaming: Stream deleted"); @@ -83,7 +83,7 @@ namespace stream { // plain ack LogPrint (eLogDebug, "Streaming: Plain ACK received"); - delete packet; + m_LocalDestination.DeletePacket (packet); return; } @@ -131,7 +131,7 @@ namespace stream // we have received duplicate LogPrint (eLogWarning, "Streaming: Duplicate message ", receivedSeqn, " on sSID=", m_SendStreamID); SendQuickAck (); // resend ack for previous message again - delete packet; // packet dropped + m_LocalDestination.DeletePacket (packet); // packet dropped } else { @@ -163,7 +163,7 @@ namespace stream void Stream::SavePacket (Packet * packet) { if (!m_SavedPackets.insert (packet).second) - delete packet; + m_LocalDestination.DeletePacket (packet); } void Stream::ProcessPacket (Packet * packet) @@ -216,7 +216,7 @@ namespace stream m_ReceiveTimer.cancel (); } else - delete packet; + m_LocalDestination.DeletePacket (packet); m_LastReceivedSequenceNumber = receivedSeqn; @@ -278,7 +278,7 @@ namespace stream m_RTO = m_RTT*1.5; // TODO: implement it better LogPrint (eLogDebug, "Streaming: Packet ", seqn, " acknowledged rtt=", rtt, " sentTime=", sentPacket->sendTime); m_SentPackets.erase (it++); - delete sentPacket; + m_LocalDestination.DeletePacket (sentPacket); acknowledged = true; if (m_WindowSize < WINDOW_SIZE) m_WindowSize++; // slow start @@ -345,7 +345,7 @@ namespace stream std::unique_lock l(m_SendBufferMutex); while ((m_Status == eStreamStatusNew) || (IsEstablished () && !m_SendBuffer.eof () && numMsgs > 0)) { - Packet * p = new Packet (); + Packet * p = m_LocalDestination.NewPacket (); uint8_t * packet = p->GetBuffer (); // TODO: implement setters size_t size = 0; @@ -532,7 +532,7 @@ namespace stream void Stream::SendClose () { - Packet * p = new Packet (); + Packet * p = m_LocalDestination.NewPacket (); uint8_t * packet = p->GetBuffer (); size_t size = 0; htobe32buf (packet + size, m_SendStreamID); @@ -574,7 +574,7 @@ namespace stream if (!packet->GetLength ()) { m_ReceiveQueue.pop (); - delete packet; + m_LocalDestination.DeletePacket (packet); } } return pos; @@ -852,7 +852,7 @@ namespace stream { for (auto& it: m_SavedPackets) { - for (auto it1: it.second) delete it1; + for (auto it1: it.second) DeletePacket (it1); it.second.clear (); } m_SavedPackets.clear (); @@ -889,7 +889,7 @@ namespace stream else { LogPrint (eLogError, "Streaming: Unknown stream sSID=", sendStreamID); - delete packet; + DeletePacket (packet); } } else @@ -901,7 +901,7 @@ namespace stream { // already pending LogPrint(eLogWarning, "Streaming: Incoming streaming with rSID=", receiveStreamID, " already exists"); - delete packet; // drop it, because previous should be connected + DeletePacket (packet); // drop it, because previous should be connected return; } auto incomingStream = CreateNewIncomingStream (); @@ -980,7 +980,7 @@ namespace stream auto it = s->m_SavedPackets.find (receiveStreamID); if (it != s->m_SavedPackets.end ()) { - for (auto it1: it->second) delete it1; + for (auto it1: it->second) s->DeletePacket (it1); it->second.clear (); s->m_SavedPackets.erase (it); } @@ -1074,13 +1074,13 @@ namespace stream void StreamingDestination::HandleDataMessagePayload (const uint8_t * buf, size_t len) { // unzip it - Packet * uncompressed = new Packet; + Packet * uncompressed = NewPacket (); uncompressed->offset = 0; uncompressed->len = m_Inflator.Inflate (buf, len, uncompressed->buf, MAX_PACKET_SIZE); if (uncompressed->len) HandleNextPacket (uncompressed); else - delete uncompressed; + DeletePacket (uncompressed); } std::shared_ptr StreamingDestination::CreateDataMessage (const uint8_t * payload, size_t len, uint16_t toPort) diff --git a/Streaming.h b/Streaming.h index 1593f961..828fccc7 100644 --- a/Streaming.h +++ b/Streaming.h @@ -18,6 +18,7 @@ #include "I2NPProtocol.h" #include "Garlic.h" #include "Tunnel.h" +#include "util.h" // MemoryPool namespace i2p { @@ -234,6 +235,9 @@ namespace stream /** set max connections per minute per destination */ void SetMaxConnsPerMinute(const uint32_t conns); + + Packet * NewPacket () { return m_PacketsPool.Acquire (); }; + void DeletePacket (Packet * p) { if (p) m_PacketsPool.Release (p); }; private: @@ -269,7 +273,9 @@ namespace stream /** banned identities */ std::vector m_Banned; uint64_t m_LastBanClear; - + + i2p::util::MemoryPool m_PacketsPool; + public: i2p::data::GzipInflator m_Inflator; diff --git a/util.h b/util.h index ac2c38d1..84299d25 100644 --- a/util.h +++ b/util.h @@ -58,8 +58,9 @@ namespace util void Release (T * t) { + if (!t) return; t->~T (); - *(void * *)t = m_Head; + *(void * *)t = m_Head; // next m_Head = t; }