Browse Source

stream status

pull/163/head
orignal 10 years ago
parent
commit
dc599bbc63
  1. 41
      Streaming.cpp
  2. 18
      Streaming.h

41
Streaming.cpp

@ -12,9 +12,9 @@ namespace i2p
namespace stream namespace stream
{ {
Stream::Stream (boost::asio::io_service& service, StreamingDestination& local, Stream::Stream (boost::asio::io_service& service, StreamingDestination& local,
std::shared_ptr<const i2p::data::LeaseSet> remote, int port): m_Service (service), m_SendStreamID (0), std::shared_ptr<const i2p::data::LeaseSet> remote, int port): m_Service (service),
m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_IsOpen (false), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1),
m_IsReset (false), m_IsAckSendScheduled (false), m_LocalDestination (local), m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_LocalDestination (local),
m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service),
m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port), m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port),
m_WindowSize (MIN_WINDOW_SIZE), m_RTT (INITIAL_RTT), m_LastWindowSizeIncreaseTime (0) m_WindowSize (MIN_WINDOW_SIZE), m_RTT (INITIAL_RTT), m_LastWindowSizeIncreaseTime (0)
@ -25,7 +25,7 @@ namespace stream
Stream::Stream (boost::asio::io_service& service, StreamingDestination& local): Stream::Stream (boost::asio::io_service& service, StreamingDestination& local):
m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1),
m_IsOpen (false), m_IsReset (false), m_IsAckSendScheduled (false), m_LocalDestination (local), m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_LocalDestination (local),
m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service),
m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), m_WindowSize (MIN_WINDOW_SIZE), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), m_WindowSize (MIN_WINDOW_SIZE),
m_RTT (INITIAL_RTT), m_LastWindowSizeIncreaseTime (0) m_RTT (INITIAL_RTT), m_LastWindowSizeIncreaseTime (0)
@ -95,7 +95,7 @@ namespace stream
} }
// schedule ack for last message // schedule ack for last message
if (m_IsOpen) if (m_Status == eStreamStatusOpen)
{ {
if (!m_IsAckSendScheduled) if (!m_IsAckSendScheduled)
{ {
@ -107,7 +107,7 @@ namespace stream
} }
else if (isSyn) else if (isSyn)
// we have to send SYN back to incoming connection // we have to send SYN back to incoming connection
Send (nullptr, 0); // also sets m_IsOpen SendBuffer (); // also sets m_IsOpen
} }
else else
{ {
@ -201,8 +201,7 @@ namespace stream
if (flags & PACKET_FLAG_CLOSE) if (flags & PACKET_FLAG_CLOSE)
{ {
LogPrint (eLogInfo, "Closed"); LogPrint (eLogInfo, "Closed");
m_IsOpen = false; m_Status = eStreamStatusReset;
m_IsReset = true;
Close (); Close ();
} }
} }
@ -264,7 +263,7 @@ namespace stream
m_ResendTimer.cancel (); m_ResendTimer.cancel ();
if (acknowledged) if (acknowledged)
SendBuffer (); SendBuffer ();
if (!m_IsOpen) if (m_Status == eStreamStatusClosing)
Close (); // all outgoing messages have been sent Close (); // all outgoing messages have been sent
} }
@ -289,7 +288,7 @@ namespace stream
std::vector<Packet *> packets; std::vector<Packet *> packets;
{ {
std::unique_lock<std::mutex> l(m_SendBufferMutex); std::unique_lock<std::mutex> l(m_SendBufferMutex);
while (!m_IsOpen || (IsEstablished () && !m_SendBuffer.eof () && numMsgs > 0)) while ((m_Status == eStreamStatusNew) || (IsEstablished () && !m_SendBuffer.eof () && numMsgs > 0))
{ {
Packet * p = new Packet (); Packet * p = new Packet ();
uint8_t * packet = p->GetBuffer (); uint8_t * packet = p->GetBuffer ();
@ -310,10 +309,10 @@ namespace stream
size++; // NACK count size++; // NACK count
packet[size] = RESEND_TIMEOUT; packet[size] = RESEND_TIMEOUT;
size++; // resend delay size++; // resend delay
if (!m_IsOpen) if (m_Status == eStreamStatusNew)
{ {
// initial packet // initial packet
m_IsOpen = true; m_IsReset = false; m_Status = eStreamStatusOpen;
uint16_t flags = PACKET_FLAG_SYNCHRONIZE | PACKET_FLAG_FROM_INCLUDED | uint16_t flags = PACKET_FLAG_SYNCHRONIZE | PACKET_FLAG_FROM_INCLUDED |
PACKET_FLAG_SIGNATURE_INCLUDED | PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED; PACKET_FLAG_SIGNATURE_INCLUDED | PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED;
if (isNoAck) flags |= PACKET_FLAG_NO_ACK; if (isNoAck) flags |= PACKET_FLAG_NO_ACK;
@ -361,7 +360,7 @@ namespace stream
m_SentPackets.insert (it); m_SentPackets.insert (it);
} }
SendPackets (packets); SendPackets (packets);
if (!m_IsOpen && m_SendBuffer.eof ()) if (m_Status == eStreamStatusClosing && m_SendBuffer.eof ())
SendClose (); SendClose ();
if (isEmpty) if (isEmpty)
ScheduleResend (); ScheduleResend ();
@ -439,15 +438,17 @@ namespace stream
void Stream::Close () void Stream::Close ()
{ {
if (m_IsOpen) if (m_Status == eStreamStatusOpen)
{ {
m_IsOpen = false; m_Status = eStreamStatusClosing;
if (m_SendBuffer.eof ()) // nothing to send if (m_SendBuffer.eof ()) // nothing to send
SendClose (); SendClose ();
} }
if (m_IsReset || m_SentPackets.empty ()) if (m_Status == eStreamStatusReset || m_SentPackets.empty ())
{ {
// closed by peer or everything has been acknowledged // closed by peer or everything has been acknowledged
if (m_Status == eStreamStatusClosing)
SendClose ();
m_ReceiveTimer.cancel (); m_ReceiveTimer.cancel ();
m_LocalDestination.DeleteStream (shared_from_this ()); m_LocalDestination.DeleteStream (shared_from_this ());
} }
@ -458,6 +459,7 @@ namespace stream
void Stream::SendClose () void Stream::SendClose ()
{ {
m_Status = eStreamStatusClosed;
Packet * p = new Packet (); Packet * p = new Packet ();
uint8_t * packet = p->GetBuffer (); uint8_t * packet = p->GetBuffer ();
size_t size = 0; size_t size = 0;
@ -516,7 +518,7 @@ namespace stream
m_AckSendTimer.cancel (); m_AckSendTimer.cancel ();
} }
SendPackets (std::vector<Packet *> { packet }); SendPackets (std::vector<Packet *> { packet });
if (m_IsOpen) if (m_Status == eStreamStatusOpen)
{ {
bool isEmpty = m_SentPackets.empty (); bool isEmpty = m_SentPackets.empty ();
m_SentPackets.insert (packet); m_SentPackets.insert (packet);
@ -602,8 +604,7 @@ namespace stream
else else
{ {
LogPrint (eLogWarning, "Packet ", it->GetSeqn (), " was not ACKed after ", MAX_NUM_RESEND_ATTEMPTS, " attempts. Terminate"); LogPrint (eLogWarning, "Packet ", it->GetSeqn (), " was not ACKed after ", MAX_NUM_RESEND_ATTEMPTS, " attempts. Terminate");
m_IsOpen = false; m_Status = eStreamStatusReset;
m_IsReset = true;
Close (); Close ();
return; return;
} }
@ -632,7 +633,7 @@ namespace stream
{ {
if (m_IsAckSendScheduled) if (m_IsAckSendScheduled)
{ {
if (m_IsOpen) if (m_Status == eStreamStatusOpen)
SendQuickAck (); SendQuickAck ();
m_IsAckSendScheduled = false; m_IsAckSendScheduled = false;
} }

18
Streaming.h

@ -83,6 +83,15 @@ namespace stream
return p1->GetSeqn () < p2->GetSeqn (); return p1->GetSeqn () < p2->GetSeqn ();
}; };
}; };
enum StreamStatus
{
eStreamStatusNew,
eStreamStatusOpen,
eStreamStatusReset,
eStreamStatusClosing,
eStreamStatusClosed
};
class StreamingDestination; class StreamingDestination;
class Stream: public std::enable_shared_from_this<Stream> class Stream: public std::enable_shared_from_this<Stream>
@ -98,7 +107,7 @@ namespace stream
uint32_t GetRecvStreamID () const { return m_RecvStreamID; }; uint32_t GetRecvStreamID () const { return m_RecvStreamID; };
std::shared_ptr<const i2p::data::LeaseSet> GetRemoteLeaseSet () const { return m_RemoteLeaseSet; }; std::shared_ptr<const i2p::data::LeaseSet> GetRemoteLeaseSet () const { return m_RemoteLeaseSet; };
const i2p::data::IdentityEx& GetRemoteIdentity () const { return m_RemoteIdentity; }; const i2p::data::IdentityEx& GetRemoteIdentity () const { return m_RemoteIdentity; };
bool IsOpen () const { return m_IsOpen; }; bool IsOpen () const { return m_Status == eStreamStatusOpen; };
bool IsEstablished () const { return m_SendStreamID; }; bool IsEstablished () const { return m_SendStreamID; };
StreamingDestination& GetLocalDestination () { return m_LocalDestination; }; StreamingDestination& GetLocalDestination () { return m_LocalDestination; };
@ -149,7 +158,8 @@ namespace stream
boost::asio::io_service& m_Service; boost::asio::io_service& m_Service;
uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber; uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber;
int32_t m_LastReceivedSequenceNumber; int32_t m_LastReceivedSequenceNumber;
bool m_IsOpen, m_IsReset, m_IsAckSendScheduled; StreamStatus m_Status;
bool m_IsAckSendScheduled;
StreamingDestination& m_LocalDestination; StreamingDestination& m_LocalDestination;
i2p::data::IdentityEx m_RemoteIdentity; i2p::data::IdentityEx m_RemoteIdentity;
std::shared_ptr<const i2p::data::LeaseSet> m_RemoteLeaseSet; std::shared_ptr<const i2p::data::LeaseSet> m_RemoteLeaseSet;
@ -239,13 +249,13 @@ namespace stream
if (ecode == boost::asio::error::operation_aborted) if (ecode == boost::asio::error::operation_aborted)
{ {
// timeout not expired // timeout not expired
if (m_IsOpen) if (m_Status == eStreamStatusOpen)
// no error // no error
handler (boost::system::error_code (), received); handler (boost::system::error_code (), received);
else else
{ {
// stream closed // stream closed
if (m_IsReset) if (m_Status == eStreamStatusReset)
{ {
// stream closed by peer // stream closed by peer
handler (received > 0 ? boost::system::error_code () : // we still have some data handler (received > 0 ? boost::system::error_code () : // we still have some data

Loading…
Cancel
Save