diff --git a/Streaming.cpp b/Streaming.cpp index 3c9edbb3..448b8c10 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -12,9 +12,9 @@ namespace i2p namespace stream { Stream::Stream (boost::asio::io_service& service, StreamingDestination& local, - std::shared_ptr remote, int port): m_Service (service), m_SendStreamID (0), - m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_IsOpen (false), - m_IsReset (false), m_IsAckSendScheduled (false), m_LocalDestination (local), + std::shared_ptr remote, int port): m_Service (service), + m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), + m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_LocalDestination (local), m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port), m_WindowSize (MIN_WINDOW_SIZE), m_RTT (INITIAL_RTT), m_LastWindowSizeIncreaseTime (0) @@ -25,7 +25,7 @@ namespace stream Stream::Stream (boost::asio::io_service& service, StreamingDestination& local): m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), - m_IsOpen (false), m_IsReset (false), m_IsAckSendScheduled (false), m_LocalDestination (local), + m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_LocalDestination (local), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), m_WindowSize (MIN_WINDOW_SIZE), m_RTT (INITIAL_RTT), m_LastWindowSizeIncreaseTime (0) @@ -95,7 +95,7 @@ namespace stream } // schedule ack for last message - if (m_IsOpen) + if (m_Status == eStreamStatusOpen) { if (!m_IsAckSendScheduled) { @@ -107,7 +107,7 @@ namespace stream } else if (isSyn) // we have to send SYN back to incoming connection - Send (nullptr, 0); // also sets m_IsOpen + SendBuffer (); // also sets m_IsOpen } else { @@ -201,8 +201,7 @@ namespace stream if (flags & PACKET_FLAG_CLOSE) { LogPrint (eLogInfo, "Closed"); - m_IsOpen = false; - m_IsReset = true; + m_Status = eStreamStatusReset; Close (); } } @@ -264,7 +263,7 @@ namespace stream m_ResendTimer.cancel (); if (acknowledged) SendBuffer (); - if (!m_IsOpen) + if (m_Status == eStreamStatusClosing) Close (); // all outgoing messages have been sent } @@ -289,7 +288,7 @@ namespace stream std::vector packets; { std::unique_lock l(m_SendBufferMutex); - while (!m_IsOpen || (IsEstablished () && !m_SendBuffer.eof () && numMsgs > 0)) + while ((m_Status == eStreamStatusNew) || (IsEstablished () && !m_SendBuffer.eof () && numMsgs > 0)) { Packet * p = new Packet (); uint8_t * packet = p->GetBuffer (); @@ -310,10 +309,10 @@ namespace stream size++; // NACK count packet[size] = RESEND_TIMEOUT; size++; // resend delay - if (!m_IsOpen) + if (m_Status == eStreamStatusNew) { // initial packet - m_IsOpen = true; m_IsReset = false; + m_Status = eStreamStatusOpen; uint16_t flags = PACKET_FLAG_SYNCHRONIZE | PACKET_FLAG_FROM_INCLUDED | PACKET_FLAG_SIGNATURE_INCLUDED | PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED; if (isNoAck) flags |= PACKET_FLAG_NO_ACK; @@ -361,7 +360,7 @@ namespace stream m_SentPackets.insert (it); } SendPackets (packets); - if (!m_IsOpen && m_SendBuffer.eof ()) + if (m_Status == eStreamStatusClosing && m_SendBuffer.eof ()) SendClose (); if (isEmpty) ScheduleResend (); @@ -439,15 +438,17 @@ namespace stream void Stream::Close () { - if (m_IsOpen) + if (m_Status == eStreamStatusOpen) { - m_IsOpen = false; + m_Status = eStreamStatusClosing; if (m_SendBuffer.eof ()) // nothing to send SendClose (); } - if (m_IsReset || m_SentPackets.empty ()) + if (m_Status == eStreamStatusReset || m_SentPackets.empty ()) { // closed by peer or everything has been acknowledged + if (m_Status == eStreamStatusClosing) + SendClose (); m_ReceiveTimer.cancel (); m_LocalDestination.DeleteStream (shared_from_this ()); } @@ -458,6 +459,7 @@ namespace stream void Stream::SendClose () { + m_Status = eStreamStatusClosed; Packet * p = new Packet (); uint8_t * packet = p->GetBuffer (); size_t size = 0; @@ -516,7 +518,7 @@ namespace stream m_AckSendTimer.cancel (); } SendPackets (std::vector { packet }); - if (m_IsOpen) + if (m_Status == eStreamStatusOpen) { bool isEmpty = m_SentPackets.empty (); m_SentPackets.insert (packet); @@ -602,8 +604,7 @@ namespace stream else { LogPrint (eLogWarning, "Packet ", it->GetSeqn (), " was not ACKed after ", MAX_NUM_RESEND_ATTEMPTS, " attempts. Terminate"); - m_IsOpen = false; - m_IsReset = true; + m_Status = eStreamStatusReset; Close (); return; } @@ -632,7 +633,7 @@ namespace stream { if (m_IsAckSendScheduled) { - if (m_IsOpen) + if (m_Status == eStreamStatusOpen) SendQuickAck (); m_IsAckSendScheduled = false; } diff --git a/Streaming.h b/Streaming.h index 73db7692..73508a71 100644 --- a/Streaming.h +++ b/Streaming.h @@ -83,6 +83,15 @@ namespace stream return p1->GetSeqn () < p2->GetSeqn (); }; }; + + enum StreamStatus + { + eStreamStatusNew, + eStreamStatusOpen, + eStreamStatusReset, + eStreamStatusClosing, + eStreamStatusClosed + }; class StreamingDestination; class Stream: public std::enable_shared_from_this @@ -98,7 +107,7 @@ namespace stream uint32_t GetRecvStreamID () const { return m_RecvStreamID; }; std::shared_ptr GetRemoteLeaseSet () const { return m_RemoteLeaseSet; }; const i2p::data::IdentityEx& GetRemoteIdentity () const { return m_RemoteIdentity; }; - bool IsOpen () const { return m_IsOpen; }; + bool IsOpen () const { return m_Status == eStreamStatusOpen; }; bool IsEstablished () const { return m_SendStreamID; }; StreamingDestination& GetLocalDestination () { return m_LocalDestination; }; @@ -149,7 +158,8 @@ namespace stream boost::asio::io_service& m_Service; uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber; int32_t m_LastReceivedSequenceNumber; - bool m_IsOpen, m_IsReset, m_IsAckSendScheduled; + StreamStatus m_Status; + bool m_IsAckSendScheduled; StreamingDestination& m_LocalDestination; i2p::data::IdentityEx m_RemoteIdentity; std::shared_ptr m_RemoteLeaseSet; @@ -239,13 +249,13 @@ namespace stream if (ecode == boost::asio::error::operation_aborted) { // timeout not expired - if (m_IsOpen) + if (m_Status == eStreamStatusOpen) // no error handler (boost::system::error_code (), received); else { // stream closed - if (m_IsReset) + if (m_Status == eStreamStatusReset) { // stream closed by peer handler (received > 0 ? boost::system::error_code () : // we still have some data