Browse Source

add deferred ready checking for destination

pull/940/head
Jeff Becker 7 years ago
parent
commit
3f409d0e28
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05
  1. 72
      libi2pd/Destination.cpp
  2. 10
      libi2pd/Destination.h

72
libi2pd/Destination.cpp

@ -15,7 +15,8 @@ namespace client
LeaseSetDestination::LeaseSetDestination (bool isPublic, const std::map<std::string, std::string> * params): LeaseSetDestination::LeaseSetDestination (bool isPublic, const std::map<std::string, std::string> * params):
m_IsRunning (false), m_Thread (nullptr), m_IsPublic (isPublic), m_IsRunning (false), m_Thread (nullptr), m_IsPublic (isPublic),
m_PublishReplyToken (0), m_LastSubmissionTime (0), m_PublishConfirmationTimer (m_Service), m_PublishReplyToken (0), m_LastSubmissionTime (0), m_PublishConfirmationTimer (m_Service),
m_PublishVerificationTimer (m_Service), m_PublishDelayTimer (m_Service), m_CleanupTimer (m_Service) m_PublishVerificationTimer (m_Service), m_PublishDelayTimer (m_Service), m_CleanupTimer (m_Service),
m_ReadyCheckTimer(m_Service)
{ {
int inLen = DEFAULT_INBOUND_TUNNEL_LENGTH; int inLen = DEFAULT_INBOUND_TUNNEL_LENGTH;
int inQty = DEFAULT_INBOUND_TUNNELS_QUANTITY; int inQty = DEFAULT_INBOUND_TUNNELS_QUANTITY;
@ -118,7 +119,9 @@ namespace client
m_CleanupTimer.async_wait (std::bind (&LeaseSetDestination::HandleCleanupTimer, m_CleanupTimer.async_wait (std::bind (&LeaseSetDestination::HandleCleanupTimer,
shared_from_this (), std::placeholders::_1)); shared_from_this (), std::placeholders::_1));
m_Thread = new std::thread (std::bind (&LeaseSetDestination::Run, shared_from_this ())); m_Thread = new std::thread (std::bind (&LeaseSetDestination::Run, shared_from_this ()));
m_ReadyCheckTimer.expires_from_now(boost::posix_time::seconds (1));
m_ReadyCheckTimer.async_wait(std::bind(&LeaseSetDestination::HandleReadyCheckTimer,
shared_from_this (), std::placeholders::_1));
return true; return true;
} }
else else
@ -132,6 +135,7 @@ namespace client
m_CleanupTimer.cancel (); m_CleanupTimer.cancel ();
m_PublishConfirmationTimer.cancel (); m_PublishConfirmationTimer.cancel ();
m_PublishVerificationTimer.cancel (); m_PublishVerificationTimer.cancel ();
m_ReadyCheckTimer.cancel ();
m_IsRunning = false; m_IsRunning = false;
if (m_Pool) if (m_Pool)
@ -231,6 +235,11 @@ namespace client
} }
} }
void LeaseSetDestination::AddReadyCallback(ReadyCallback cb)
{
m_ReadyCallbacks.push_back(cb);
}
void LeaseSetDestination::UpdateLeaseSet () void LeaseSetDestination::UpdateLeaseSet ()
{ {
int numTunnels = m_Pool->GetNumInboundTunnels () + 2; // 2 backup tunnels int numTunnels = m_Pool->GetNumInboundTunnels () + 2; // 2 backup tunnels
@ -676,6 +685,30 @@ namespace client
} }
} }
void LeaseSetDestination::HandleReadyCheckTimer(const boost::system::error_code & ec)
{
if (ec != boost::asio::error::operation_aborted)
{
// TODO: locking ?
if(IsReady())
{
for (auto & itr : m_ReadyCallbacks)
{
itr(ec);
}
m_ReadyCallbacks.clear();
}
}
else
{
for (auto & itr : m_ReadyCallbacks)
{
itr(ec);
}
m_ReadyCallbacks.clear();
}
}
void LeaseSetDestination::CleanupRemoteLeaseSets () void LeaseSetDestination::CleanupRemoteLeaseSets ()
{ {
auto ts = i2p::util::GetMillisecondsSinceEpoch (); auto ts = i2p::util::GetMillisecondsSinceEpoch ();
@ -812,20 +845,33 @@ namespace client
LogPrint (eLogError, "Destination: request callback is not specified in CreateStream"); LogPrint (eLogError, "Destination: request callback is not specified in CreateStream");
return; return;
} }
auto leaseSet = FindLeaseSet (dest); if(IsReady())
if (leaseSet) {
streamRequestComplete(CreateStream (leaseSet, port)); auto leaseSet = FindLeaseSet (dest);
if (leaseSet)
streamRequestComplete(CreateStream (leaseSet, port));
else
{
auto s = GetSharedFromThis ();
RequestDestination (dest,
[s, streamRequestComplete, port](std::shared_ptr<i2p::data::LeaseSet> ls)
{
if (ls)
streamRequestComplete(s->CreateStream (ls, port));
else
streamRequestComplete (nullptr);
});
}
}
else else
{ {
auto s = GetSharedFromThis (); // call if tunnel is not ready
RequestDestination (dest, AddReadyCallback([&](const boost::system::error_code & ec) {
[s, streamRequestComplete, port](std::shared_ptr<i2p::data::LeaseSet> ls) if(ec)
{ streamRequestComplete(nullptr);
if (ls)
streamRequestComplete(s->CreateStream (ls, port));
else else
streamRequestComplete (nullptr); CreateStream(streamRequestComplete, dest, port);
}); });
} }
} }

10
libi2pd/Destination.h

@ -63,6 +63,7 @@ namespace client
public std::enable_shared_from_this<LeaseSetDestination> public std::enable_shared_from_this<LeaseSetDestination>
{ {
typedef std::function<void (std::shared_ptr<i2p::data::LeaseSet> leaseSet)> RequestComplete; typedef std::function<void (std::shared_ptr<i2p::data::LeaseSet> leaseSet)> RequestComplete;
typedef std::function<void (const boost::system::error_code &)> ReadyCallback;
// leaseSet = nullptr means not found // leaseSet = nullptr means not found
struct LeaseSetRequest struct LeaseSetRequest
{ {
@ -108,6 +109,8 @@ namespace client
void ProcessDeliveryStatusMessage (std::shared_ptr<I2NPMessage> msg); void ProcessDeliveryStatusMessage (std::shared_ptr<I2NPMessage> msg);
void SetLeaseSetUpdated (); void SetLeaseSetUpdated ();
void AddReadyCallback(ReadyCallback cb);
protected: protected:
void SetLeaseSet (i2p::data::LocalLeaseSet * newLeaseSet); void SetLeaseSet (i2p::data::LocalLeaseSet * newLeaseSet);
@ -131,7 +134,8 @@ namespace client
void RequestLeaseSet (const i2p::data::IdentHash& dest, RequestComplete requestComplete); void RequestLeaseSet (const i2p::data::IdentHash& dest, RequestComplete requestComplete);
bool SendLeaseSetRequest (const i2p::data::IdentHash& dest, std::shared_ptr<const i2p::data::RouterInfo> nextFloodfill, std::shared_ptr<LeaseSetRequest> request); bool SendLeaseSetRequest (const i2p::data::IdentHash& dest, std::shared_ptr<const i2p::data::RouterInfo> nextFloodfill, std::shared_ptr<LeaseSetRequest> request);
void HandleRequestTimoutTimer (const boost::system::error_code& ecode, const i2p::data::IdentHash& dest); void HandleRequestTimoutTimer (const boost::system::error_code& ecode, const i2p::data::IdentHash& dest);
void HandleCleanupTimer (const boost::system::error_code& ecode); void HandleCleanupTimer (const boost::system::error_code& ecode);
void HandleReadyCheckTimer (const boost::system::error_code& ecode);
void CleanupRemoteLeaseSets (); void CleanupRemoteLeaseSets ();
private: private:
@ -152,7 +156,9 @@ namespace client
std::set<i2p::data::IdentHash> m_ExcludedFloodfills; // for publishing std::set<i2p::data::IdentHash> m_ExcludedFloodfills; // for publishing
boost::asio::deadline_timer m_PublishConfirmationTimer, m_PublishVerificationTimer, boost::asio::deadline_timer m_PublishConfirmationTimer, m_PublishVerificationTimer,
m_PublishDelayTimer, m_CleanupTimer; m_PublishDelayTimer, m_CleanupTimer, m_ReadyCheckTimer;
std::vector<ReadyCallback> m_ReadyCallbacks;
public: public:

Loading…
Cancel
Save