Browse Source

replace ReceiveQueue to std::queue

pull/64/head
orignal 10 years ago
parent
commit
e1027ffb7b
  1. 35
      Streaming.cpp
  2. 6
      Streaming.h

35
Streaming.cpp

@ -28,8 +28,12 @@ namespace stream
Stream::~Stream () Stream::~Stream ()
{ {
m_ReceiveTimer.cancel (); m_ReceiveTimer.cancel ();
while (auto packet = m_ReceiveQueue.Get ()) while (!m_ReceiveQueue.empty ())
{
auto packet = m_ReceiveQueue.front ();
m_ReceiveQueue.pop ();
delete packet; delete packet;
}
for (auto it: m_SavedPackets) for (auto it: m_SavedPackets)
delete it; delete it;
} }
@ -118,7 +122,7 @@ namespace stream
packet->offset = packet->GetPayload () - packet->buf; packet->offset = packet->GetPayload () - packet->buf;
if (packet->GetLength () > 0) if (packet->GetLength () > 0)
{ {
m_ReceiveQueue.Put (packet); m_ReceiveQueue.push (packet);
m_ReceiveTimer.cancel (); m_ReceiveTimer.cancel ();
} }
else else
@ -131,7 +135,6 @@ namespace stream
LogPrint ("Closed"); LogPrint ("Closed");
SendQuickAck (); // send ack for close explicitly? SendQuickAck (); // send ack for close explicitly?
m_IsOpen = false; m_IsOpen = false;
m_ReceiveQueue.WakeUp ();
} }
} }
@ -239,30 +242,24 @@ namespace stream
if (SendPacket (packet, size)) if (SendPacket (packet, size))
LogPrint ("FIN sent"); LogPrint ("FIN sent");
m_ReceiveQueue.WakeUp ();
} }
} }
size_t Stream::ConcatenatePackets (uint8_t * buf, size_t len) size_t Stream::ConcatenatePackets (uint8_t * buf, size_t len)
{ {
size_t pos = 0; size_t pos = 0;
while (pos < len) while (pos < len && !m_ReceiveQueue.empty ())
{ {
Packet * packet = m_ReceiveQueue.Peek (); Packet * packet = m_ReceiveQueue.front ();
if (packet) size_t l = std::min (packet->GetLength (), len - pos);
memcpy (buf + pos, packet->GetBuffer (), l);
pos += l;
packet->offset += l;
if (!packet->GetLength ())
{ {
size_t l = std::min (packet->GetLength (), len - pos); m_ReceiveQueue.pop ();
memcpy (buf + pos, packet->GetBuffer (), l); delete packet;
pos += l; }
packet->offset += l;
if (!packet->GetLength ())
{
m_ReceiveQueue.Get ();
delete packet;
}
}
else // no more data available
break;
} }
return pos; return pos;
} }

6
Streaming.h

@ -5,12 +5,12 @@
#include <string> #include <string>
#include <map> #include <map>
#include <set> #include <set>
#include <queue>
#include <thread> #include <thread>
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include <boost/bind.hpp> #include <boost/bind.hpp>
#include <cryptopp/dsa.h> #include <cryptopp/dsa.h>
#include "I2PEndian.h" #include "I2PEndian.h"
#include "Queue.h"
#include "Identity.h" #include "Identity.h"
#include "LeaseSet.h" #include "LeaseSet.h"
#include "I2NPProtocol.h" #include "I2NPProtocol.h"
@ -112,7 +112,7 @@ namespace stream
StreamingDestination * m_LocalDestination; StreamingDestination * m_LocalDestination;
const i2p::data::LeaseSet& m_RemoteLeaseSet; const i2p::data::LeaseSet& m_RemoteLeaseSet;
i2p::data::Lease m_CurrentRemoteLease; i2p::data::Lease m_CurrentRemoteLease;
i2p::util::Queue<Packet> m_ReceiveQueue; std::queue<Packet *> m_ReceiveQueue;
std::set<Packet *, PacketCmp> m_SavedPackets; std::set<Packet *, PacketCmp> m_SavedPackets;
i2p::tunnel::OutboundTunnel * m_OutboundTunnel; i2p::tunnel::OutboundTunnel * m_OutboundTunnel;
boost::asio::deadline_timer m_ReceiveTimer; boost::asio::deadline_timer m_ReceiveTimer;
@ -204,7 +204,7 @@ namespace stream
template<typename Buffer, typename ReceiveHandler> template<typename Buffer, typename ReceiveHandler>
void Stream::AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout) void Stream::AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout)
{ {
if (!m_ReceiveQueue.IsEmpty ()) if (!m_ReceiveQueue.empty ())
{ {
size_t received = ConcatenatePackets (boost::asio::buffer_cast<uint8_t *>(buffer), boost::asio::buffer_size(buffer)); size_t received = ConcatenatePackets (boost::asio::buffer_cast<uint8_t *>(buffer), boost::asio::buffer_size(buffer));
if (received) if (received)

Loading…
Cancel
Save