diff --git a/libi2pd/Streaming.cpp b/libi2pd/Streaming.cpp index c6447b5f..be3d09d8 100644 --- a/libi2pd/Streaming.cpp +++ b/libi2pd/Streaming.cpp @@ -490,7 +490,7 @@ namespace stream handler(boost::system::error_code ()); m_Service.post (std::bind (&Stream::SendBuffer, shared_from_this ())); } - + void Stream::SendBuffer () { int numMsgs = m_WindowSize - m_SentPackets.size (); @@ -665,6 +665,42 @@ namespace stream LogPrint (eLogDebug, "Streaming: Quick Ack sent. ", (int)numNacks, " NACKs"); } + void Stream::SendPing () + { + Packet p; + uint8_t * packet = p.GetBuffer (); + size_t size = 0; + htobe32buf (packet, m_RecvStreamID); + size += 4; // sendStreamID + memset (packet + size, 0, 14); + size += 14; // all zeroes + uint16_t flags = PACKET_FLAG_ECHO | PACKET_FLAG_SIGNATURE_INCLUDED | PACKET_FLAG_FROM_INCLUDED; + bool isOfflineSignature = m_LocalDestination.GetOwner ()->GetPrivateKeys ().IsOfflineSignature (); + if (isOfflineSignature) flags |= PACKET_FLAG_OFFLINE_SIGNATURE; + htobe16buf (packet + size, flags); + size += 2; // flags + size_t identityLen = m_LocalDestination.GetOwner ()->GetIdentity ()->GetFullLen (); + size_t signatureLen = m_LocalDestination.GetOwner ()->GetPrivateKeys ().GetSignatureLen (); + uint8_t * optionsSize = packet + size; // set options size later + size += 2; // options size + m_LocalDestination.GetOwner ()->GetIdentity ()->ToBuffer (packet + size, identityLen); + size += identityLen; // from + if (isOfflineSignature) + { + const auto& offlineSignature = m_LocalDestination.GetOwner ()->GetPrivateKeys ().GetOfflineSignature (); + memcpy (packet + size, offlineSignature.data (), offlineSignature.size ()); + size += offlineSignature.size (); // offline signature + } + uint8_t * signature = packet + size; // set it later + memset (signature, 0, signatureLen); // zeroes for now + size += signatureLen; // signature + htobe16buf (optionsSize, packet + size - 2 - optionsSize); // actual options size + m_LocalDestination.GetOwner ()->Sign (packet, size, signature); + p.len = size; + SendPackets (std::vector { &p }); + LogPrint (eLogDebug, "Streaming: Ping of ", p.len, " bytes sent"); + } + void Stream::Close () { LogPrint(eLogDebug, "Streaming: closing stream with sSID=", m_SendStreamID, ", rSID=", m_RecvStreamID, ", status=", m_Status); @@ -1103,6 +1139,13 @@ namespace stream } else { + if (packet->IsEcho ()) + { + // pong + LogPrint (eLogInfo, "Streaming: Pong received rSID=", packet->GetReceiveStreamID ()); + DeletePacket (packet); + return; + } if (packet->IsSYN () && !packet->GetSeqn ()) // new incoming stream { uint32_t receiveStreamID = packet->GetReceiveStreamID (); @@ -1197,6 +1240,12 @@ namespace stream return s; } + void StreamingDestination::SendPing (std::shared_ptr remote) + { + auto s = std::make_shared (m_Owner->GetService (), *this, remote, 0); + s->SendPing (); + } + std::shared_ptr StreamingDestination::CreateNewIncomingStream (uint32_t receiveStreamID) { auto s = std::make_shared (m_Owner->GetService (), *this); diff --git a/libi2pd/Streaming.h b/libi2pd/Streaming.h index 189b1a64..c13dcab1 100644 --- a/libi2pd/Streaming.h +++ b/libi2pd/Streaming.h @@ -179,7 +179,8 @@ namespace stream void HandlePing (Packet * packet); size_t Send (const uint8_t * buf, size_t len); void AsyncSend (const uint8_t * buf, size_t len, SendHandler handler); - + void SendPing (); + template void AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout = 0); size_t ReadSome (uint8_t * buf, size_t len) { return ConcatenatePackets (buf, len); }; @@ -268,6 +269,7 @@ namespace stream void Stop (); std::shared_ptr CreateNewOutgoingStream (std::shared_ptr remote, int port = 0); + void SendPing (std::shared_ptr remote); void DeleteStream (std::shared_ptr stream); bool DeleteStream (uint32_t recvStreamID); void SetAcceptor (const Acceptor& acceptor);