Browse Source

* sane log messages: Streaming.cpp

pull/317/head
hagen 9 years ago
parent
commit
642d0e6f74
  1. 64
      Streaming.cpp

64
Streaming.cpp

@ -53,7 +53,7 @@ namespace stream
delete it; delete it;
m_SavedPackets.clear (); m_SavedPackets.clear ();
LogPrint (eLogDebug, "Stream deleted"); LogPrint (eLogDebug, "Streaming: Stream deleted");
} }
void Stream::Terminate () void Stream::Terminate ()
@ -83,12 +83,12 @@ namespace stream
if (!receivedSeqn && !isSyn) if (!receivedSeqn && !isSyn)
{ {
// plain ack // plain ack
LogPrint (eLogDebug, "Plain ACK received"); LogPrint (eLogDebug, "Streaming: Plain ACK received");
delete packet; delete packet;
return; return;
} }
LogPrint (eLogDebug, "Received seqn=", receivedSeqn); LogPrint (eLogDebug, "Streaming: Received seqn=", receivedSeqn);
if (isSyn || receivedSeqn == m_LastReceivedSequenceNumber + 1) if (isSyn || receivedSeqn == m_LastReceivedSequenceNumber + 1)
{ {
// we have received next in sequence message // we have received next in sequence message
@ -128,13 +128,13 @@ namespace stream
if (receivedSeqn <= m_LastReceivedSequenceNumber) if (receivedSeqn <= m_LastReceivedSequenceNumber)
{ {
// we have received duplicate // we have received duplicate
LogPrint (eLogWarning, "Duplicate message ", receivedSeqn, " received"); LogPrint (eLogWarning, "Streaming: Duplicate message ", receivedSeqn, " received");
SendQuickAck (); // resend ack for previous message again SendQuickAck (); // resend ack for previous message again
delete packet; // packet dropped delete packet; // packet dropped
} }
else else
{ {
LogPrint (eLogWarning, "Missing messages from ", m_LastReceivedSequenceNumber + 1, " to ", receivedSeqn - 1); LogPrint (eLogWarning, "Streaming: Missing messages from ", m_LastReceivedSequenceNumber + 1, " to ", receivedSeqn - 1);
// save message and wait for missing message again // save message and wait for missing message again
SavePacket (packet); SavePacket (packet);
if (m_LastReceivedSequenceNumber >= 0) if (m_LastReceivedSequenceNumber >= 0)
@ -169,43 +169,36 @@ namespace stream
// process flags // process flags
uint32_t receivedSeqn = packet->GetSeqn (); uint32_t receivedSeqn = packet->GetSeqn ();
uint16_t flags = packet->GetFlags (); uint16_t flags = packet->GetFlags ();
LogPrint (eLogDebug, "Process seqn=", receivedSeqn, ", flags=", flags); LogPrint (eLogDebug, "Streaming: Process seqn=", receivedSeqn, ", flags=", flags);
const uint8_t * optionData = packet->GetOptionData (); const uint8_t * optionData = packet->GetOptionData ();
if (flags & PACKET_FLAG_SYNCHRONIZE)
LogPrint (eLogDebug, "Synchronize");
if (flags & PACKET_FLAG_DELAY_REQUESTED) if (flags & PACKET_FLAG_DELAY_REQUESTED)
{
optionData += 2; optionData += 2;
}
if (flags & PACKET_FLAG_FROM_INCLUDED) if (flags & PACKET_FLAG_FROM_INCLUDED)
{ {
m_RemoteIdentity = std::make_shared<i2p::data::IdentityEx>(optionData, packet->GetOptionSize ()); m_RemoteIdentity = std::make_shared<i2p::data::IdentityEx>(optionData, packet->GetOptionSize ());
optionData += m_RemoteIdentity->GetFullLen (); optionData += m_RemoteIdentity->GetFullLen ();
LogPrint (eLogInfo, "From identity ", m_RemoteIdentity->GetIdentHash ().ToBase64 ());
if (!m_RemoteLeaseSet) if (!m_RemoteLeaseSet)
LogPrint (eLogDebug, "Incoming stream from ", m_RemoteIdentity->GetIdentHash ().ToBase64 ()); LogPrint (eLogDebug, "Streaming: Incoming stream from ", m_RemoteIdentity->GetIdentHash ().ToBase64 ());
} }
if (flags & PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED) if (flags & PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED)
{ {
uint16_t maxPacketSize = bufbe16toh (optionData); uint16_t maxPacketSize = bufbe16toh (optionData);
LogPrint (eLogDebug, "Max packet size ", maxPacketSize);
optionData += 2; optionData += 2;
} }
if (flags & PACKET_FLAG_SIGNATURE_INCLUDED) if (flags & PACKET_FLAG_SIGNATURE_INCLUDED)
{ {
LogPrint (eLogDebug, "Signature");
uint8_t signature[256]; uint8_t signature[256];
auto signatureLen = m_RemoteIdentity->GetSignatureLen (); auto signatureLen = m_RemoteIdentity->GetSignatureLen ();
memcpy (signature, optionData, signatureLen); memcpy (signature, optionData, signatureLen);
memset (const_cast<uint8_t *>(optionData), 0, signatureLen); memset (const_cast<uint8_t *>(optionData), 0, signatureLen);
if (!m_RemoteIdentity->Verify (packet->GetBuffer (), packet->GetLength (), signature)) if (!m_RemoteIdentity->Verify (packet->GetBuffer (), packet->GetLength (), signature))
{ {
LogPrint (eLogError, "Signature verification failed"); LogPrint (eLogError, "Streaming: Signature verification failed");
Close (); Close ();
flags |= PACKET_FLAG_CLOSE; flags |= PACKET_FLAG_CLOSE;
} }
@ -226,7 +219,6 @@ namespace stream
if (flags & (PACKET_FLAG_CLOSE | PACKET_FLAG_RESET)) if (flags & (PACKET_FLAG_CLOSE | PACKET_FLAG_RESET))
{ {
LogPrint (eLogInfo, (flags & PACKET_FLAG_RESET) ? "Reset" : "Closed");
m_Status = eStreamStatusReset; m_Status = eStreamStatusReset;
Close (); Close ();
} }
@ -254,7 +246,7 @@ namespace stream
} }
if (nacked) if (nacked)
{ {
LogPrint (eLogDebug, "Packet ", seqn, " NACK"); LogPrint (eLogDebug, "Streaming: Packet ", seqn, " NACK");
it++; it++;
continue; continue;
} }
@ -418,7 +410,7 @@ namespace stream
} }
if (lastReceivedSeqn < 0) if (lastReceivedSeqn < 0)
{ {
LogPrint (eLogError, "No packets have been received yet"); LogPrint (eLogError, "Streaming: No packets have been received yet");
return; return;
} }
@ -474,7 +466,7 @@ namespace stream
p.len = size; p.len = size;
SendPackets (std::vector<Packet *> { &p }); SendPackets (std::vector<Packet *> { &p });
LogPrint ("Quick Ack sent. ", (int)numNacks, " NACKs"); LogPrint (eLogDebug, "Streaming: Quick Ack sent. ", (int)numNacks, " NACKs");
} }
void Stream::Close () void Stream::Close ()
@ -485,7 +477,7 @@ namespace stream
m_Status = eStreamStatusClosing; m_Status = eStreamStatusClosing;
Close (); // recursion Close (); // recursion
if (m_Status == eStreamStatusClosing) //still closing if (m_Status == eStreamStatusClosing) //still closing
LogPrint (eLogInfo, "Trying to send stream data before closing"); LogPrint (eLogInfo, "Streaming: Trying to send stream data before closing");
break; break;
case eStreamStatusReset: case eStreamStatusReset:
SendClose (); SendClose ();
@ -507,7 +499,7 @@ namespace stream
m_LocalDestination.DeleteStream (shared_from_this ()); m_LocalDestination.DeleteStream (shared_from_this ());
break; break;
default: default:
LogPrint (eLogWarning, "Unexpected stream status ", (int)m_Status); LogPrint (eLogWarning, "Streaming: Unexpected stream status ", (int)m_Status);
}; };
} }
@ -539,7 +531,7 @@ namespace stream
p->len = size; p->len = size;
m_Service.post (std::bind (&Stream::SendPacket, shared_from_this (), p)); m_Service.post (std::bind (&Stream::SendPacket, shared_from_this (), p));
LogPrint ("FIN sent"); LogPrint (eLogDebug, "Streaming: FIN sent");
} }
size_t Stream::ConcatenatePackets (uint8_t * buf, size_t len) size_t Stream::ConcatenatePackets (uint8_t * buf, size_t len)
@ -593,7 +585,7 @@ namespace stream
UpdateCurrentRemoteLease (); UpdateCurrentRemoteLease ();
if (!m_RemoteLeaseSet) if (!m_RemoteLeaseSet)
{ {
LogPrint (eLogError, "Can't send packets. Missing remote LeaseSet"); LogPrint (eLogError, "Streaming: Can't send packets, missing remote LeaseSet");
return; return;
} }
} }
@ -601,7 +593,7 @@ namespace stream
m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNewOutboundTunnel (m_CurrentOutboundTunnel); m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNewOutboundTunnel (m_CurrentOutboundTunnel);
if (!m_CurrentOutboundTunnel) if (!m_CurrentOutboundTunnel)
{ {
LogPrint (eLogError, "No outbound tunnels in the pool"); LogPrint (eLogError, "Streaming: No outbound tunnels in the pool");
return; return;
} }
@ -625,7 +617,7 @@ namespace stream
m_CurrentOutboundTunnel->SendTunnelDataMsg (msgs); m_CurrentOutboundTunnel->SendTunnelDataMsg (msgs);
} }
else else
LogPrint (eLogWarning, "All leases are expired"); LogPrint (eLogWarning, "Streaming: All leases are expired");
} }
@ -644,7 +636,7 @@ namespace stream
// check for resend attempts // check for resend attempts
if (m_NumResendAttempts >= MAX_NUM_RESEND_ATTEMPTS) if (m_NumResendAttempts >= MAX_NUM_RESEND_ATTEMPTS)
{ {
LogPrint (eLogWarning, "Stream packet was not ACKed after ", MAX_NUM_RESEND_ATTEMPTS, " attempts. Terminate"); LogPrint (eLogWarning, "Streaming: packet was not ACKed after ", MAX_NUM_RESEND_ATTEMPTS, " attempts, terminate");
m_Status = eStreamStatusReset; m_Status = eStreamStatusReset;
Close (); Close ();
return; return;
@ -678,12 +670,12 @@ namespace stream
// no break here // no break here
case 4: case 4:
UpdateCurrentRemoteLease (); // pick another lease UpdateCurrentRemoteLease (); // pick another lease
LogPrint (eLogWarning, "Another remote lease has been selected for stream"); LogPrint (eLogWarning, "Streaming: Another remote lease has been selected for stream");
break; break;
case 3: case 3:
// pick another outbound tunnel // pick another outbound tunnel
m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNextOutboundTunnel (m_CurrentOutboundTunnel); m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNextOutboundTunnel (m_CurrentOutboundTunnel);
LogPrint (eLogWarning, "Another outbound tunnel has been selected for stream"); LogPrint (eLogWarning, "Streaming: Another outbound tunnel has been selected for stream");
break; break;
default: ; default: ;
} }
@ -699,7 +691,7 @@ namespace stream
{ {
if (m_LastReceivedSequenceNumber < 0) if (m_LastReceivedSequenceNumber < 0)
{ {
LogPrint (eLogWarning, "SYN has not been recived after ", ACK_SEND_TIMEOUT, " milliseconds after follow on. Terminate"); LogPrint (eLogWarning, "Streaming: SYN has not been recived after ", ACK_SEND_TIMEOUT, " milliseconds after follow on, terminate");
m_Status = eStreamStatusReset; m_Status = eStreamStatusReset;
Close (); Close ();
return; return;
@ -716,7 +708,7 @@ namespace stream
{ {
m_RemoteLeaseSet = m_LocalDestination.GetOwner ()->FindLeaseSet (m_RemoteIdentity->GetIdentHash ()); m_RemoteLeaseSet = m_LocalDestination.GetOwner ()->FindLeaseSet (m_RemoteIdentity->GetIdentHash ());
if (!m_RemoteLeaseSet) if (!m_RemoteLeaseSet)
LogPrint ("LeaseSet ", m_RemoteIdentity->GetIdentHash ().ToBase64 (), " not found"); LogPrint (eLogError, "Streaming: LeaseSet ", m_RemoteIdentity->GetIdentHash ().ToBase64 (), " not found");
} }
if (m_RemoteLeaseSet) if (m_RemoteLeaseSet)
{ {
@ -819,7 +811,7 @@ namespace stream
it->second->HandleNextPacket (packet); it->second->HandleNextPacket (packet);
else else
{ {
LogPrint ("Unknown stream sendStreamID=", sendStreamID); LogPrint (eLogError, "Streaming: Unknown stream sendStreamID=", sendStreamID);
delete packet; delete packet;
} }
} }
@ -833,7 +825,7 @@ namespace stream
m_Acceptor (incomingStream); m_Acceptor (incomingStream);
else else
{ {
LogPrint (eLogInfo, "Acceptor for incoming stream is not set"); LogPrint (eLogWarning, "Streaming: Acceptor for incoming stream is not set");
if (m_PendingIncomingStreams.size () < MAX_PENDING_INCOMING_BACKLOG) if (m_PendingIncomingStreams.size () < MAX_PENDING_INCOMING_BACKLOG)
{ {
m_PendingIncomingStreams.push_back (incomingStream); m_PendingIncomingStreams.push_back (incomingStream);
@ -841,11 +833,11 @@ namespace stream
m_PendingIncomingTimer.expires_from_now (boost::posix_time::seconds(PENDING_INCOMING_TIMEOUT)); m_PendingIncomingTimer.expires_from_now (boost::posix_time::seconds(PENDING_INCOMING_TIMEOUT));
m_PendingIncomingTimer.async_wait (std::bind (&StreamingDestination::HandlePendingIncomingTimer, m_PendingIncomingTimer.async_wait (std::bind (&StreamingDestination::HandlePendingIncomingTimer,
this, std::placeholders::_1)); this, std::placeholders::_1));
LogPrint (eLogInfo, "Pending incoming stream added"); LogPrint (eLogDebug, "Streaming: Pending incoming stream added");
} }
else else
{ {
LogPrint (eLogError, "Pending incoming streams backlog exceeds ", MAX_PENDING_INCOMING_BACKLOG); LogPrint (eLogWarning, "Streaming: Pending incoming streams backlog exceeds ", MAX_PENDING_INCOMING_BACKLOG);
incomingStream->Close (); incomingStream->Close ();
} }
} }
@ -861,7 +853,7 @@ namespace stream
return; return;
} }
// TODO: should queue it up // TODO: should queue it up
LogPrint ("Unknown stream receiveStreamID=", receiveStreamID); LogPrint (eLogError, "Streaming: Unknown stream receiveStreamID=", receiveStreamID);
delete packet; delete packet;
} }
} }
@ -917,7 +909,7 @@ namespace stream
{ {
if (ecode != boost::asio::error::operation_aborted) if (ecode != boost::asio::error::operation_aborted)
{ {
LogPrint (eLogInfo, "Pending incoming timeout expired"); LogPrint (eLogWarning, "Streaming: Pending incoming timeout expired");
for (auto it: m_PendingIncomingStreams) for (auto it: m_PendingIncomingStreams)
it->Close (); it->Close ();
m_PendingIncomingStreams.clear (); m_PendingIncomingStreams.clear ();

Loading…
Cancel
Save