Browse Source

separate thread for streaming

pull/48/head
orignal 11 years ago
parent
commit
4bd42625fd
  1. 2
      Garlic.cpp
  2. 83
      Streaming.cpp
  3. 39
      Streaming.h

2
Garlic.cpp

@ -392,7 +392,7 @@ namespace garlic
// later on we should let destination decide // later on we should let destination decide
I2NPHeader * header = (I2NPHeader *)buf; I2NPHeader * header = (I2NPHeader *)buf;
if (header->typeID == eI2NPData) if (header->typeID == eI2NPData)
i2p::stream::HandleDataMessage (&destination, buf + sizeof (I2NPHeader), be16toh (header->size)); i2p::stream::HandleDataMessage (destination, buf + sizeof (I2NPHeader), be16toh (header->size));
else else
LogPrint ("Unexpected I2NP garlic message ", (int)header->typeID); LogPrint ("Unexpected I2NP garlic message ", (int)header->typeID);
break; break;

83
Streaming.cpp

@ -14,10 +14,11 @@ namespace i2p
{ {
namespace stream namespace stream
{ {
Stream::Stream (StreamingDestination * local, const i2p::data::LeaseSet& remote): Stream::Stream (boost::asio::io_service& service, StreamingDestination * local,
m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (0), const i2p::data::LeaseSet& remote): m_Service (service), m_SendStreamID (0),
m_IsOpen (false), m_LeaseSetUpdated (true), m_LocalDestination (local), m_SequenceNumber (0), m_LastReceivedSequenceNumber (0), m_IsOpen (false),
m_RemoteLeaseSet (remote), m_OutboundTunnel (nullptr) m_LeaseSetUpdated (true), m_LocalDestination (local), m_RemoteLeaseSet (remote),
m_OutboundTunnel (nullptr)
{ {
m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 ();
UpdateCurrentRemoteLease (); UpdateCurrentRemoteLease ();
@ -312,7 +313,6 @@ namespace stream
m_CurrentRemoteLease.endDate = 0; m_CurrentRemoteLease.endDate = 0;
} }
StreamingDestination * sharedLocalDestination = nullptr;
StreamingDestination::StreamingDestination (): m_LeaseSet (nullptr) StreamingDestination::StreamingDestination (): m_LeaseSet (nullptr)
{ {
@ -354,9 +354,10 @@ namespace stream
} }
} }
Stream * StreamingDestination::CreateNewStream (const i2p::data::LeaseSet& remote) Stream * StreamingDestination::CreateNewStream (boost::asio::io_service& service,
const i2p::data::LeaseSet& remote)
{ {
Stream * s = new Stream (this, remote); Stream * s = new Stream (service, this, remote);
m_Streams[s->GetRecvStreamID ()] = s; m_Streams[s->GetRecvStreamID ()] = s;
return s; return s;
} }
@ -436,31 +437,77 @@ namespace stream
signer.SignMessage (i2p::context.GetRandomNumberGenerator (), buf, len, signature); signer.SignMessage (i2p::context.GetRandomNumberGenerator (), buf, len, signature);
} }
StreamingDestinations destinations;
void StreamingDestinations::Start ()
{
if (!m_SharedLocalDestination)
m_SharedLocalDestination = new StreamingDestination ();
m_IsRunning = true;
m_Thread = new std::thread (std::bind (&StreamingDestinations::Run, this));
}
void StreamingDestinations::Stop ()
{
delete m_SharedLocalDestination;
m_IsRunning = false;
m_Service.stop ();
if (m_Thread)
{
m_Thread->join ();
delete m_Thread;
m_Thread = 0;
}
}
void StreamingDestinations::Run ()
{
m_Service.run ();
}
Stream * StreamingDestinations::CreateClientStream (const i2p::data::LeaseSet& remote)
{
if (!m_SharedLocalDestination) return nullptr;
return m_SharedLocalDestination->CreateNewStream (m_Service, remote);
}
void StreamingDestinations::DeleteClientStream (Stream * stream)
{
if (m_SharedLocalDestination)
m_SharedLocalDestination->DeleteStream (stream);
else
delete stream;
}
void StreamingDestinations::HandleNextPacket (i2p::data::IdentHash destination, Packet * packet)
{
// TODO: we have onle one destination, might be more
if (m_SharedLocalDestination)
m_SharedLocalDestination->HandleNextPacket (packet);
}
Stream * CreateStream (const i2p::data::LeaseSet& remote) Stream * CreateStream (const i2p::data::LeaseSet& remote)
{ {
if (!sharedLocalDestination) return destinations.CreateClientStream (remote);
sharedLocalDestination = new StreamingDestination ();
return sharedLocalDestination->CreateNewStream (remote);
} }
void DeleteStream (Stream * stream) void DeleteStream (Stream * stream)
{ {
if (sharedLocalDestination) destinations.DeleteClientStream (stream);
sharedLocalDestination->DeleteStream (stream);
} }
void StartStreaming () void StartStreaming ()
{ {
if (!sharedLocalDestination) destinations.Start ();
sharedLocalDestination = new StreamingDestination ();
} }
void StopStreaming () void StopStreaming ()
{ {
delete sharedLocalDestination; destinations.Stop ();
} }
void HandleDataMessage (i2p::data::IdentHash * destination, const uint8_t * buf, size_t len) void HandleDataMessage (i2p::data::IdentHash destination, const uint8_t * buf, size_t len)
{ {
uint32_t length = be32toh (*(uint32_t *)buf); uint32_t length = be32toh (*(uint32_t *)buf);
buf += 4; buf += 4;
@ -481,9 +528,7 @@ namespace stream
} }
decompressor.Get (uncompressed->buf, uncompressed->len); decompressor.Get (uncompressed->buf, uncompressed->len);
// then forward to streaming engine // then forward to streaming engine
// TODO: we have onle one destination, might be more destinations.HandleNextPacket (destination, uncompressed);
if (sharedLocalDestination)
sharedLocalDestination->HandleNextPacket (uncompressed);
} }
else else
LogPrint ("Data: protocol ", buf[9], " is not supported"); LogPrint ("Data: protocol ", buf[9], " is not supported");

39
Streaming.h

@ -5,6 +5,8 @@
#include <string> #include <string>
#include <map> #include <map>
#include <set> #include <set>
#include <thread>
#include <boost/asio.hpp>
#include <cryptopp/dsa.h> #include <cryptopp/dsa.h>
#include "I2PEndian.h" #include "I2PEndian.h"
#include "Queue.h" #include "Queue.h"
@ -67,7 +69,7 @@ namespace stream
{ {
public: public:
Stream (StreamingDestination * local, const i2p::data::LeaseSet& remote); Stream (boost::asio::io_service& service, StreamingDestination * local, const i2p::data::LeaseSet& remote);
~Stream (); ~Stream ();
uint32_t GetSendStreamID () const { return m_SendStreamID; }; uint32_t GetSendStreamID () const { return m_SendStreamID; };
uint32_t GetRecvStreamID () const { return m_RecvStreamID; }; uint32_t GetRecvStreamID () const { return m_RecvStreamID; };
@ -95,6 +97,7 @@ namespace stream
private: private:
boost::asio::io_service& m_Service;
uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber, m_LastReceivedSequenceNumber; uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber, m_LastReceivedSequenceNumber;
bool m_IsOpen, m_LeaseSetUpdated; bool m_IsOpen, m_LeaseSetUpdated;
StreamingDestination * m_LocalDestination; StreamingDestination * m_LocalDestination;
@ -119,7 +122,7 @@ namespace stream
i2p::tunnel::TunnelPool * GetTunnelPool () const { return m_Pool; }; i2p::tunnel::TunnelPool * GetTunnelPool () const { return m_Pool; };
void Sign (uint8_t * buf, int len, uint8_t * signature) const; void Sign (uint8_t * buf, int len, uint8_t * signature) const;
Stream * CreateNewStream (const i2p::data::LeaseSet& remote); Stream * CreateNewStream (boost::asio::io_service& service, const i2p::data::LeaseSet& remote);
void DeleteStream (Stream * stream); void DeleteStream (Stream * stream);
void HandleNextPacket (Packet * packet); void HandleNextPacket (Packet * packet);
@ -142,13 +145,43 @@ namespace stream
CryptoPP::DSA::PrivateKey m_SigningPrivateKey; CryptoPP::DSA::PrivateKey m_SigningPrivateKey;
}; };
class StreamingDestinations
{
public:
StreamingDestinations (): m_IsRunning (false), m_Thread (nullptr),
m_Work (m_Service), m_SharedLocalDestination (nullptr) {};
~StreamingDestinations () {};
void Start ();
void Stop ();
void HandleNextPacket (i2p::data::IdentHash destination, Packet * packet);
Stream * CreateClientStream (const i2p::data::LeaseSet& remote);
void DeleteClientStream (Stream * stream);
private:
void Run ();
private:
bool m_IsRunning;
std::thread * m_Thread;
boost::asio::io_service m_Service;
boost::asio::io_service::work m_Work;
StreamingDestination * m_SharedLocalDestination;
};
Stream * CreateStream (const i2p::data::LeaseSet& remote); Stream * CreateStream (const i2p::data::LeaseSet& remote);
void DeleteStream (Stream * stream); void DeleteStream (Stream * stream);
void StartStreaming (); void StartStreaming ();
void StopStreaming (); void StopStreaming ();
// assuming data is I2CP message // assuming data is I2CP message
void HandleDataMessage (i2p::data::IdentHash * destination, const uint8_t * buf, size_t len); void HandleDataMessage (i2p::data::IdentHash destination, const uint8_t * buf, size_t len);
I2NPMessage * CreateDataMessage (Stream * s, uint8_t * payload, size_t len); I2NPMessage * CreateDataMessage (Stream * s, uint8_t * payload, size_t len);
} }
} }

Loading…
Cancel
Save