Browse Source

Allow for asynchronous creation of streams

pull/129/head
Francisco Blas (klondike) Izquierdo Riera 10 years ago
parent
commit
a906d7f02f
  1. 45
      Destination.cpp
  2. 6
      Destination.h

45
Destination.cpp

@ -1,5 +1,5 @@
#include <algorithm> #include <algorithm>
#include <mutex> #include <cassert>
#include <boost/lexical_cast.hpp> #include <boost/lexical_cast.hpp>
#include <cryptopp/dh.h> #include <cryptopp/dh.h>
#include "Log.h" #include "Log.h"
@ -384,47 +384,48 @@ namespace client
} }
} }
std::shared_ptr<i2p::stream::Stream> ClientDestination::CreateStream (const std::string& dest, int port) { void ClientDestination::CreateStream (StreamRequestComplete streamRequestComplete, const std::string& dest, int port) {
assert(streamRequestComplete);
i2p::data::IdentHash identHash; i2p::data::IdentHash identHash;
if (i2p::client::context.GetAddressBook ().GetIdentHash (dest, identHash)) if (i2p::client::context.GetAddressBook ().GetIdentHash (dest, identHash))
return CreateStream (identHash, port); CreateStream (streamRequestComplete, identHash, port);
else else
{ {
LogPrint (eLogWarning, "Remote destination ", dest, " not found"); LogPrint (eLogWarning, "Remote destination ", dest, " not found");
return nullptr; streamRequestComplete (nullptr);
} }
} }
std::shared_ptr<i2p::stream::Stream> ClientDestination::CreateStream (const i2p::data::IdentHash& dest, int port) { void ClientDestination::CreateStream (StreamRequestComplete streamRequestComplete, const i2p::data::IdentHash& dest, int port) {
assert(streamRequestComplete);
const i2p::data::LeaseSet * leaseSet = FindLeaseSet (dest); const i2p::data::LeaseSet * leaseSet = FindLeaseSet (dest);
if (!leaseSet) if (leaseSet)
streamRequestComplete(CreateStream (*leaseSet, port));
else
{ {
bool found = false;
std::condition_variable newDataReceived;
std::mutex newDataReceivedMutex;
std::unique_lock<std::mutex> l(newDataReceivedMutex);
RequestDestination (dest, RequestDestination (dest,
[&newDataReceived, &found](bool success) [this, streamRequestComplete, dest, port](bool success)
{ {
found = success; if (!success)
newDataReceived.notify_all (); streamRequestComplete (nullptr);
else
{
const i2p::data::LeaseSet * leaseSet = FindLeaseSet (dest);
if (leaseSet)
streamRequestComplete(CreateStream (*leaseSet, port));
else
streamRequestComplete (nullptr);
}
}); });
if (newDataReceived.wait_for (l, std::chrono::seconds (STREAM_REQUEST_TIMEOUT)) == std::cv_status::timeout)
LogPrint (eLogError, "Subscription LeseseSet request timeout expired");
if (found)
leaseSet = FindLeaseSet (dest);
} }
if (leaseSet)
return CreateStream (*leaseSet, port);
else
return nullptr;
} }
std::shared_ptr<i2p::stream::Stream> ClientDestination::CreateStream (const i2p::data::LeaseSet& remote, int port) std::shared_ptr<i2p::stream::Stream> ClientDestination::CreateStream (const i2p::data::LeaseSet& remote, int port)
{ {
if (m_StreamingDestination) if (m_StreamingDestination)
return m_StreamingDestination->CreateNewOutgoingStream (remote, port); return m_StreamingDestination->CreateNewOutgoingStream (remote, port);
return nullptr; else
return nullptr;
} }
void ClientDestination::AcceptStreams (const i2p::stream::StreamingDestination::Acceptor& acceptor) void ClientDestination::AcceptStreams (const i2p::stream::StreamingDestination::Acceptor& acceptor)

6
Destination.h

@ -49,6 +49,8 @@ namespace client
RequestComplete requestComplete; RequestComplete requestComplete;
}; };
typedef std::function<void (std::shared_ptr<i2p::stream::Stream> stream)> StreamRequestComplete;
public: public:
ClientDestination (const i2p::data::PrivateKeys& keys, bool isPublic, const std::map<std::string, std::string> * params = nullptr); ClientDestination (const i2p::data::PrivateKeys& keys, bool isPublic, const std::map<std::string, std::string> * params = nullptr);
@ -65,8 +67,8 @@ namespace client
// streaming // streaming
i2p::stream::StreamingDestination * GetStreamingDestination () const { return m_StreamingDestination; }; i2p::stream::StreamingDestination * GetStreamingDestination () const { return m_StreamingDestination; };
std::shared_ptr<i2p::stream::Stream> CreateStream (const std::string& dest, int port = 0); void CreateStream (StreamRequestComplete streamRequestComplete, const std::string& dest, int port = 0);
std::shared_ptr<i2p::stream::Stream> CreateStream (const i2p::data::IdentHash& dest, int port = 0); void CreateStream (StreamRequestComplete streamRequestComplete, const i2p::data::IdentHash& dest, int port = 0);
std::shared_ptr<i2p::stream::Stream> CreateStream (const i2p::data::LeaseSet& remote, int port = 0); std::shared_ptr<i2p::stream::Stream> CreateStream (const i2p::data::LeaseSet& remote, int port = 0);
void AcceptStreams (const i2p::stream::StreamingDestination::Acceptor& acceptor); void AcceptStreams (const i2p::stream::StreamingDestination::Acceptor& acceptor);
void StopAcceptingStreams (); void StopAcceptingStreams ();

Loading…
Cancel
Save