diff --git a/libi2pd/Streaming.cpp b/libi2pd/Streaming.cpp index e8386b61..79fcbc02 100644 --- a/libi2pd/Streaming.cpp +++ b/libi2pd/Streaming.cpp @@ -217,25 +217,57 @@ namespace stream void Stream::ProcessPacket (Packet * packet) { - // process flags uint32_t receivedSeqn = packet->GetSeqn (); uint16_t flags = packet->GetFlags (); LogPrint (eLogDebug, "Streaming: Process seqn=", receivedSeqn, ", flags=", flags); - const uint8_t * optionData = packet->GetOptionData (); + if (!ProcessOptions (flags, packet)) + { + m_LocalDestination.DeletePacket (packet); + Terminate (); + return; + } + + packet->offset = packet->GetPayload () - packet->buf; + if (packet->GetLength () > 0) + { + m_ReceiveQueue.push (packet); + m_ReceiveTimer.cancel (); + } + else + m_LocalDestination.DeletePacket (packet); + m_LastReceivedSequenceNumber = receivedSeqn; + + if (flags & PACKET_FLAG_RESET) + { + LogPrint (eLogDebug, "Streaming: closing stream sSID=", m_SendStreamID, ", rSID=", m_RecvStreamID, ": reset flag received in packet #", receivedSeqn); + m_Status = eStreamStatusReset; + Close (); + } + else if (flags & PACKET_FLAG_CLOSE) + { + if (m_Status != eStreamStatusClosed) + SendClose (); + m_Status = eStreamStatusClosed; + Terminate (); + } + } + + bool Stream::ProcessOptions (uint16_t flags, Packet * packet) + { + const uint8_t * optionData = packet->GetOptionData (); + size_t optionSize = packet->GetOptionSize (); if (flags & PACKET_FLAG_DELAY_REQUESTED) optionData += 2; if (flags & PACKET_FLAG_FROM_INCLUDED) { - m_RemoteIdentity = std::make_shared(optionData, packet->GetOptionSize ()); + m_RemoteIdentity = std::make_shared(optionData, optionSize); if (m_RemoteIdentity->IsRSA ()) { LogPrint (eLogInfo, "Streaming: Incoming stream from RSA destination ", m_RemoteIdentity->GetIdentHash ().ToBase64 (), " Discarded"); - m_LocalDestination.DeletePacket (packet); - Terminate (); - return; + return false; } optionData += m_RemoteIdentity->GetFullLen (); if (!m_RemoteLeaseSet) @@ -249,6 +281,42 @@ namespace stream optionData += 2; } + if (flags & PACKET_FLAG_OFFLINE_SIGNATURE) + { + if (!m_RemoteIdentity) + { + LogPrint (eLogInfo, "Streaming: offline signature without identity"); + return false; + } + const uint8_t * signedData = optionData; + uint32_t expiresTimestamp = bufbe32toh (optionData); optionData += 4; // expires timestamp + if (expiresTimestamp < i2p::util::GetSecondsSinceEpoch ()) + { + LogPrint (eLogInfo, "Streaming: offline signature transient key expired"); + return false; + } + uint16_t keyType = bufbe16toh (optionData); optionData += 2; // key type + m_TransientVerifier.reset (i2p::data::IdentityEx::CreateVerifier (keyType)); + if (!m_TransientVerifier) + { + LogPrint (eLogInfo, "Streaming: Unknown transient key type ", (int)keyType); + return false; + } + auto keyLen = m_TransientVerifier->GetPublicKeyLen (); + if ((optionData - packet->GetOptionData ()) + keyLen + m_RemoteIdentity->GetSignatureLen () > optionSize) + { + LogPrint (eLogInfo, "Streaming: Option data is too short ", (int)optionSize); + return false; + } + m_TransientVerifier->SetPublicKey (optionData); optionData += keyLen; // key + if (!m_RemoteIdentity->Verify (signedData, keyLen + 6, optionData)) + { + LogPrint (eLogError, "Streaming: Transient key signature verification failed"); + return false; + } + optionData += m_RemoteIdentity->GetSignatureLen (); // signature + } + if (flags & PACKET_FLAG_SIGNATURE_INCLUDED) { uint8_t signature[256]; @@ -257,7 +325,10 @@ namespace stream { memcpy (signature, optionData, signatureLen); memset (const_cast(optionData), 0, signatureLen); - if (!m_RemoteIdentity->Verify (packet->GetBuffer (), packet->GetLength (), signature)) + bool verified = m_TransientVerifier ? + m_TransientVerifier->Verify (packet->GetBuffer (), packet->GetLength (), signature) : + m_RemoteIdentity->Verify (packet->GetBuffer (), packet->GetLength (), signature); + if (!verified) { LogPrint (eLogError, "Streaming: Signature verification failed, sSID=", m_SendStreamID, ", rSID=", m_RecvStreamID); Close (); @@ -268,34 +339,11 @@ namespace stream } else { - LogPrint(eLogError, "Streaming: Signature too big, ", signatureLen, " bytes"); + LogPrint (eLogError, "Streaming: Signature too big, ", signatureLen, " bytes"); + return false; } } - - packet->offset = packet->GetPayload () - packet->buf; - if (packet->GetLength () > 0) - { - m_ReceiveQueue.push (packet); - m_ReceiveTimer.cancel (); - } - else - m_LocalDestination.DeletePacket (packet); - - m_LastReceivedSequenceNumber = receivedSeqn; - - if (flags & PACKET_FLAG_RESET) - { - LogPrint (eLogDebug, "Streaming: closing stream sSID=", m_SendStreamID, ", rSID=", m_RecvStreamID, ": reset flag received in packet #", receivedSeqn); - m_Status = eStreamStatusReset; - Close (); - } - else if (flags & PACKET_FLAG_CLOSE) - { - if (m_Status != eStreamStatusClosed) - SendClose (); - m_Status = eStreamStatusClosed; - Terminate (); - } + return true; } void Stream::ProcessAck (Packet * packet) diff --git a/libi2pd/Streaming.h b/libi2pd/Streaming.h index 3db8d760..56f6b0e0 100644 --- a/libi2pd/Streaming.h +++ b/libi2pd/Streaming.h @@ -38,6 +38,7 @@ namespace stream const uint16_t PACKET_FLAG_PROFILE_INTERACTIVE = 0x0100; const uint16_t PACKET_FLAG_ECHO = 0x0200; const uint16_t PACKET_FLAG_NO_ACK = 0x0400; + const uint16_t PACKET_FLAG_OFFLINE_SIGNATURE = 0x0800; const size_t STREAMING_MTU = 1730; const size_t MAX_PACKET_SIZE = 4096; @@ -195,6 +196,7 @@ namespace stream void SavePacket (Packet * packet); void ProcessPacket (Packet * packet); + bool ProcessOptions (uint16_t flags, Packet * packet); void ProcessAck (Packet * packet); size_t ConcatenatePackets (uint8_t * buf, size_t len); @@ -216,6 +218,7 @@ namespace stream bool m_IsAckSendScheduled; StreamingDestination& m_LocalDestination; std::shared_ptr m_RemoteIdentity; + std::unique_ptr m_TransientVerifier; // in case of offline key std::shared_ptr m_RemoteLeaseSet; std::shared_ptr m_RoutingSession; std::shared_ptr m_CurrentRemoteLease;