Browse Source

replaced stringstream by a list of buffers

pull/824/head
orignal 8 years ago
parent
commit
04dc34260f
  1. 98
      Streaming.cpp
  2. 50
      Streaming.h

98
Streaming.cpp

@ -11,6 +11,50 @@ namespace i2p
{ {
namespace stream namespace stream
{ {
void SendBufferQueue::Add (const uint8_t * buf, size_t len, SendHandler handler)
{
m_Buffers.push_back (std::make_shared<SendBuffer>(buf, len, handler));
m_Size += len;
}
size_t SendBufferQueue::Get (uint8_t * buf, size_t len)
{
size_t offset = 0;
while (!m_Buffers.empty () && offset < len)
{
auto nextBuffer = m_Buffers.front ();
auto rem = nextBuffer->GetRemainingSize ();
if (offset + rem <= len)
{
// whole buffer
memcpy (buf + offset, nextBuffer->GetRemaningBuffer (), rem);
offset += rem;
m_Buffers.pop_front (); // delete it
}
else
{
// partially
rem = len - offset;
memcpy (buf + offset, nextBuffer->GetRemaningBuffer (), len - offset);
nextBuffer->offset += (len - offset);
offset = len; // break
}
}
m_Size -= offset;
return offset;
}
void SendBufferQueue::CleanUp ()
{
if (!m_Buffers.empty ())
{
for (auto it: m_Buffers)
it->Cancel ();
m_Buffers.clear ();
m_Size = 0;
}
}
Stream::Stream (boost::asio::io_service& service, StreamingDestination& local, Stream::Stream (boost::asio::io_service& service, StreamingDestination& local,
std::shared_ptr<const i2p::data::LeaseSet> remote, int port): m_Service (service), std::shared_ptr<const i2p::data::LeaseSet> remote, int port): m_Service (service),
m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1),
@ -45,19 +89,16 @@ namespace stream
m_AckSendTimer.cancel (); m_AckSendTimer.cancel ();
m_ReceiveTimer.cancel (); m_ReceiveTimer.cancel ();
m_ResendTimer.cancel (); m_ResendTimer.cancel ();
if (m_SendHandler)
{
auto handler = m_SendHandler;
m_SendHandler = nullptr;
handler (boost::asio::error::make_error_code (boost::asio::error::operation_aborted));
}
//CleanUp (); /* Need to recheck - broke working on windows */ //CleanUp (); /* Need to recheck - broke working on windows */
m_LocalDestination.DeleteStream (shared_from_this ()); m_LocalDestination.DeleteStream (shared_from_this ());
} }
void Stream::CleanUp () void Stream::CleanUp ()
{ {
m_SendBuffer.str (""); {
std::unique_lock<std::mutex> l(m_SendBufferMutex);
m_SendBuffer.CleanUp ();
}
while (!m_ReceiveQueue.empty ()) while (!m_ReceiveQueue.empty ())
{ {
auto packet = m_ReceiveQueue.front (); auto packet = m_ReceiveQueue.front ();
@ -321,23 +362,21 @@ namespace stream
size_t Stream::Send (const uint8_t * buf, size_t len) size_t Stream::Send (const uint8_t * buf, size_t len)
{ {
if (len > 0 && buf) // TODO: check max buffer size
{ AsyncSend (buf, len, nullptr);
std::unique_lock<std::mutex> l(m_SendBufferMutex);
m_SendBuffer.clear ();
m_SendBuffer.write ((const char *)buf, len);
}
m_Service.post (std::bind (&Stream::SendBuffer, shared_from_this ()));
return len; return len;
} }
void Stream::AsyncSend (const uint8_t * buf, size_t len, SendHandler handler) void Stream::AsyncSend (const uint8_t * buf, size_t len, SendHandler handler)
{ {
if (m_SendHandler) if (len > 0 && buf)
handler (boost::asio::error::make_error_code (boost::asio::error::in_progress)); {
else std::unique_lock<std::mutex> l(m_SendBufferMutex);
m_SendHandler = handler; m_SendBuffer.Add (buf, len, handler);
Send (buf, len); }
else if (handler)
handler(boost::system::error_code ());
m_Service.post (std::bind (&Stream::SendBuffer, shared_from_this ()));
} }
void Stream::SendBuffer () void Stream::SendBuffer ()
@ -349,7 +388,7 @@ namespace stream
std::vector<Packet *> packets; std::vector<Packet *> packets;
{ {
std::unique_lock<std::mutex> l(m_SendBufferMutex); std::unique_lock<std::mutex> l(m_SendBufferMutex);
while ((m_Status == eStreamStatusNew) || (IsEstablished () && !m_SendBuffer.eof () && numMsgs > 0)) while ((m_Status == eStreamStatusNew) || (IsEstablished () && !m_SendBuffer.IsEmpty () && numMsgs > 0))
{ {
Packet * p = m_LocalDestination.NewPacket (); Packet * p = m_LocalDestination.NewPacket ();
uint8_t * packet = p->GetBuffer (); uint8_t * packet = p->GetBuffer ();
@ -390,8 +429,7 @@ namespace stream
uint8_t * signature = packet + size; // set it later uint8_t * signature = packet + size; // set it later
memset (signature, 0, signatureLen); // zeroes for now memset (signature, 0, signatureLen); // zeroes for now
size += signatureLen; // signature size += signatureLen; // signature
m_SendBuffer.read ((char *)(packet + size), STREAMING_MTU - size); size += m_SendBuffer.Get (packet + size, STREAMING_MTU - size); // payload
size += m_SendBuffer.gcount (); // payload
m_LocalDestination.GetOwner ()->Sign (packet, size, signature); m_LocalDestination.GetOwner ()->Sign (packet, size, signature);
} }
else else
@ -401,22 +439,12 @@ namespace stream
size += 2; // flags size += 2; // flags
htobuf16 (packet + size, 0); // no options htobuf16 (packet + size, 0); // no options
size += 2; // options size size += 2; // options size
m_SendBuffer.read((char *)(packet + size), STREAMING_MTU - size); size += m_SendBuffer.Get(packet + size, STREAMING_MTU - size); // payload
size += m_SendBuffer.gcount (); // payload
} }
p->len = size; p->len = size;
packets.push_back (p); packets.push_back (p);
numMsgs--; numMsgs--;
} }
if (m_SendBuffer.eof ())
{
m_SendBuffer.str(""); // clean up buffer
if (m_SendHandler)
{
m_SendHandler (boost::system::error_code ());
m_SendHandler = nullptr;
}
}
} }
if (packets.size () > 0) if (packets.size () > 0)
{ {
@ -433,7 +461,7 @@ namespace stream
m_SentPackets.insert (it); m_SentPackets.insert (it);
} }
SendPackets (packets); SendPackets (packets);
if (m_Status == eStreamStatusClosing && m_SendBuffer.eof ()) if (m_Status == eStreamStatusClosing && m_SendBuffer.IsEmpty ())
SendClose (); SendClose ();
if (isEmpty) if (isEmpty)
ScheduleResend (); ScheduleResend ();
@ -525,7 +553,7 @@ namespace stream
Terminate (); Terminate ();
break; break;
case eStreamStatusClosing: case eStreamStatusClosing:
if (m_SentPackets.empty () && m_SendBuffer.eof ()) // nothing to send if (m_SentPackets.empty () && m_SendBuffer.IsEmpty ()) // nothing to send
{ {
m_Status = eStreamStatusClosed; m_Status = eStreamStatusClosed;
SendClose (); SendClose ();

50
Streaming.h

@ -3,7 +3,6 @@
#include <inttypes.h> #include <inttypes.h>
#include <string> #include <string>
#include <sstream>
#include <map> #include <map>
#include <set> #include <set>
#include <queue> #include <queue>
@ -104,6 +103,48 @@ namespace stream
}; };
}; };
typedef std::function<void (const boost::system::error_code& ecode)> SendHandler;
struct SendBuffer
{
uint8_t * buf;
size_t len, offset;
SendHandler handler;
SendBuffer (const uint8_t * b, size_t l, SendHandler h):
len(l), offset (0), handler(h)
{
buf = new uint8_t[len];
memcpy (buf, b, len);
}
~SendBuffer ()
{
delete[] buf;
if (handler) handler(boost::system::error_code ());
}
size_t GetRemainingSize () const { return len - offset; };
const uint8_t * GetRemaningBuffer () const { return buf + offset; };
void Cancel () { if (handler) handler (boost::asio::error::make_error_code (boost::asio::error::operation_aborted)); handler = nullptr; };
};
class SendBufferQueue
{
public:
SendBufferQueue (): m_Size (0) {};
~SendBufferQueue () { CleanUp (); };
void Add (const uint8_t * buf, size_t len, SendHandler handler);
size_t Get (uint8_t * buf, size_t len);
size_t GetSize () const { return m_Size; };
bool IsEmpty () const { return m_Buffers.empty (); };
void CleanUp ();
private:
std::list<std::shared_ptr<SendBuffer> > m_Buffers;
size_t m_Size;
};
enum StreamStatus enum StreamStatus
{ {
eStreamStatusNew = 0, eStreamStatusNew = 0,
@ -118,8 +159,6 @@ namespace stream
{ {
public: public:
typedef std::function<void (const boost::system::error_code& ecode)> SendHandler;
Stream (boost::asio::io_service& service, StreamingDestination& local, Stream (boost::asio::io_service& service, StreamingDestination& local,
std::shared_ptr<const i2p::data::LeaseSet> remote, int port = 0); // outgoing std::shared_ptr<const i2p::data::LeaseSet> remote, int port = 0); // outgoing
Stream (boost::asio::io_service& service, StreamingDestination& local); // incoming Stream (boost::asio::io_service& service, StreamingDestination& local); // incoming
@ -149,7 +188,7 @@ namespace stream
size_t GetNumReceivedBytes () const { return m_NumReceivedBytes; }; size_t GetNumReceivedBytes () const { return m_NumReceivedBytes; };
size_t GetSendQueueSize () const { return m_SentPackets.size (); }; size_t GetSendQueueSize () const { return m_SentPackets.size (); };
size_t GetReceiveQueueSize () const { return m_ReceiveQueue.size (); }; size_t GetReceiveQueueSize () const { return m_ReceiveQueue.size (); };
size_t GetSendBufferSize () const { return m_SendBuffer.rdbuf ()->in_avail (); }; size_t GetSendBufferSize () const { return m_SendBuffer.GetSize (); };
int GetWindowSize () const { return m_WindowSize; }; int GetWindowSize () const { return m_WindowSize; };
int GetRTT () const { return m_RTT; }; int GetRTT () const { return m_RTT; };
@ -202,11 +241,10 @@ namespace stream
uint16_t m_Port; uint16_t m_Port;
std::mutex m_SendBufferMutex; std::mutex m_SendBufferMutex;
std::stringstream m_SendBuffer; SendBufferQueue m_SendBuffer;
int m_WindowSize, m_RTT, m_RTO; int m_WindowSize, m_RTT, m_RTO;
uint64_t m_LastWindowSizeIncreaseTime; uint64_t m_LastWindowSizeIncreaseTime;
int m_NumResendAttempts; int m_NumResendAttempts;
SendHandler m_SendHandler;
}; };
class StreamingDestination: public std::enable_shared_from_this<StreamingDestination> class StreamingDestination: public std::enable_shared_from_this<StreamingDestination>

Loading…
Cancel
Save