From a35cdef7388d667a9139611c2880a93ada7555af Mon Sep 17 00:00:00 2001 From: Miguel Freitas Date: Wed, 7 Aug 2013 22:47:34 -0300 Subject: [PATCH] dht_reply_data_done_alert to unlock client waiting for data --- libtorrent/include/libtorrent/alert_types.hpp | 21 +++++++ .../libtorrent/kademlia/dht_tracker.hpp | 3 +- .../include/libtorrent/kademlia/node.hpp | 3 +- libtorrent/src/alert.cpp | 7 +++ libtorrent/src/kademlia/dht_tracker.cpp | 5 +- libtorrent/src/kademlia/node.cpp | 16 ++++- libtorrent/src/session_impl.cpp | 15 ++++- src/twister.cpp | 59 ++++++++++++++++--- 8 files changed, 114 insertions(+), 15 deletions(-) diff --git a/libtorrent/include/libtorrent/alert_types.hpp b/libtorrent/include/libtorrent/alert_types.hpp index 38165b0b..6e967178 100644 --- a/libtorrent/include/libtorrent/alert_types.hpp +++ b/libtorrent/include/libtorrent/alert_types.hpp @@ -1163,6 +1163,27 @@ namespace libtorrent entry::list_type const m_lst; }; + struct TORRENT_EXPORT dht_reply_data_done_alert: alert + { + dht_reply_data_done_alert(std::string const &username, + std::string const &resource, bool multi, + bool is_neighbor, bool got_data) + : m_username(username), m_resource(resource), m_multi(multi), + m_is_neighbor(is_neighbor), m_got_data(got_data) + {} + + TORRENT_DEFINE_ALERT(dht_reply_data_done_alert); + + const static int static_category = alert::dht_notification; + virtual std::string message() const; + + std::string const m_username; + std::string const m_resource; + bool m_multi; + bool m_is_neighbor; + bool m_got_data; + }; + struct TORRENT_EXPORT dht_get_data_alert: alert { dht_get_data_alert(entry const& target, bool possiblyNeighbor, bool hasData) diff --git a/libtorrent/include/libtorrent/kademlia/dht_tracker.hpp b/libtorrent/include/libtorrent/kademlia/dht_tracker.hpp index 972f3d42..5bce8841 100644 --- a/libtorrent/include/libtorrent/kademlia/dht_tracker.hpp +++ b/libtorrent/include/libtorrent/kademlia/dht_tracker.hpp @@ -98,7 +98,8 @@ namespace libtorrent { namespace dht int timeutc, int seq); void getData(std::string const &username, std::string const &resource, bool multi, - boost::function f); + boost::function fdata, + boost::function fdone); void dht_status(session_status& s); void network_stats(int& sent, int& received); diff --git a/libtorrent/include/libtorrent/kademlia/node.hpp b/libtorrent/include/libtorrent/kademlia/node.hpp index 1b07b3ef..23d72438 100644 --- a/libtorrent/include/libtorrent/kademlia/node.hpp +++ b/libtorrent/include/libtorrent/kademlia/node.hpp @@ -253,7 +253,8 @@ public: int timeutc, int seq); void getData(std::string const &username, std::string const &resource, bool multi, - boost::function f); + boost::function fdata, + boost::function fdone); bool verify_token(std::string const& token, char const* info_hash , udp::endpoint const& addr); diff --git a/libtorrent/src/alert.cpp b/libtorrent/src/alert.cpp index 6aa99f2d..a08094c0 100644 --- a/libtorrent/src/alert.cpp +++ b/libtorrent/src/alert.cpp @@ -358,6 +358,13 @@ namespace libtorrent { return msg; } + std::string dht_reply_data_done_alert::message() const + { + char msg[200]; + snprintf(msg, sizeof(msg), "reply of getData done"); + return msg; + } + std::string dht_get_data_alert::message() const { char msg[200]; diff --git a/libtorrent/src/kademlia/dht_tracker.cpp b/libtorrent/src/kademlia/dht_tracker.cpp index 6bfd1ae8..ee1e7e36 100644 --- a/libtorrent/src/kademlia/dht_tracker.cpp +++ b/libtorrent/src/kademlia/dht_tracker.cpp @@ -430,9 +430,10 @@ namespace libtorrent { namespace dht } void dht_tracker::getData(std::string const &username, std::string const &resource, bool multi, - boost::function f) + boost::function fdata, + boost::function fdone) { - m_dht.getData(username, resource, multi, f); + m_dht.getData(username, resource, multi, fdata, fdone); } diff --git a/libtorrent/src/kademlia/node.cpp b/libtorrent/src/kademlia/node.cpp index d2d6b4a6..8ef5490a 100644 --- a/libtorrent/src/kademlia/node.cpp +++ b/libtorrent/src/kademlia/node.cpp @@ -365,6 +365,14 @@ namespace node.m_rpc.invoke(e, i->first.ep(), o); } } + + void getDataDone_fun(std::vector > const& node_results, + bool got_data, node_impl& node, + boost::function fdone) + { + bool is_neighbor = false; + fdone(is_neighbor, got_data); + } } void node_impl::add_router_node(udp::endpoint router) @@ -431,15 +439,17 @@ void node_impl::putData(std::string const &username, std::string const &resource } void node_impl::getData(std::string const &username, std::string const &resource, bool multi, - boost::function f) + boost::function fdata, + boost::function fdone) { #ifdef TORRENT_DHT_VERBOSE_LOGGING TORRENT_LOG(node) << "getData [ username: " << info_hash << " res: " << resource << " ]" ; #endif // search for nodes with ids close to id or with peers // for info-hash id. callback is used to return data. - boost::intrusive_ptr ta(new dht_get(*this, username, resource, multi, f, - boost::bind(&nop), false)); + boost::intrusive_ptr ta(new dht_get(*this, username, resource, multi, + fdata, + boost::bind(&getDataDone_fun, _1, _2, boost::ref(*this), fdone), false)); ta->start(); } diff --git a/libtorrent/src/session_impl.cpp b/libtorrent/src/session_impl.cpp index bc0807db..08a1d6c8 100644 --- a/libtorrent/src/session_impl.cpp +++ b/libtorrent/src/session_impl.cpp @@ -5748,9 +5748,22 @@ retry: } } + void getDataDone_fun(aux::session_impl *si, + std::string const &username, std::string const &resource, bool multi, + bool is_neighbor, bool got_data) + { + if( si->m_alerts.should_post() ) { + si->m_alerts.post_alert( + dht_reply_data_done_alert(username, resource, multi, + is_neighbor, got_data)); + } + } + void session_impl::dht_getData(std::string const &username, std::string const &resource, bool multi) { - if (m_dht) m_dht->getData(username, resource, multi, boost::bind( post_dht_getData, this, _1)); + if (m_dht) m_dht->getData(username, resource, multi, + boost::bind( post_dht_getData, this, _1), + boost::bind( getDataDone_fun, this, username, resource, multi, _1, _2)); } void session_impl::on_dht_router_name_lookup(error_code const& e diff --git a/src/twister.cpp b/src/twister.cpp index e999ea08..1cf540e8 100644 --- a/src/twister.cpp +++ b/src/twister.cpp @@ -218,6 +218,8 @@ void ThreadMaintainDHTNodes() void ThreadSessionAlerts() { + static map neighborCheck; + while(!ses) { MilliSleep(200); } @@ -271,14 +273,53 @@ void ThreadSessionAlerts() if (gd) { if( gd->m_possiblyNeighbor ) { - printf("possiblyNeighbor of [%s,%s,%s]\n", - gd->m_target.find_key("n")->string().c_str(), - gd->m_target.find_key("r")->string().c_str(), - gd->m_target.find_key("t")->string().c_str()); + entry const *n = gd->m_target.find_key("n"); + entry const *r = gd->m_target.find_key("r"); + entry const *t = gd->m_target.find_key("t"); + + if( n && n->type() == entry::string_t && + r && r->type() == entry::string_t && + t && t->type() == entry::string_t) { + + // now we do our own search to make sure we are really close to this target + uint256 th = dhtTargetHash(n->string(), r->string(), t->string()); + + if( !neighborCheck.count(th) ) { + printf("possiblyNeighbor of [%s,%s,%s]\n", + n->string().c_str(), + r->string().c_str(), + t->string().c_str()); + + neighborCheck[th] = gd->m_target; + ses->dht_getData(n->string(), r->string(), t->string() == "m"); + } + } + } continue; } + dht_reply_data_done_alert const* dd = alert_cast(*i); + if (dd) + { + printf("get_data_gone [%s,%s,%s] is_neighbor=%d got_data=%d\n", + dd->m_username.c_str(), dd->m_resource.c_str(), dd->m_multi ? "m" : "s", + dd->m_is_neighbor, dd->m_got_data); + + uint256 th = dhtTargetHash(dd->m_username, dd->m_resource, dd->m_multi ? "m" : "s"); + + { + LOCK(cs_dhtgetMap); + std::map::iterator mi = m_dhtgetMap.find(th); + if( mi != m_dhtgetMap.end() && !dd->m_got_data ) { + // post alert to return from wait_for_alert in dhtget() + alert_manager *am = (*mi).second; + am->post_alert(*dd); + } + } + + continue; + } /* save_resume_data_alert const* rd = alert_cast(*i); @@ -500,13 +541,17 @@ Value dhtget(const Array& params, bool fHelp) ses->dht_getData(strUsername, strResource, multi); - Value ret; + Value ret = Array(); - if( am.wait_for_alert(seconds(10)) ) { + if( am.wait_for_alert(seconds(20)) ) { std::auto_ptr a(am.get()); dht_reply_data_alert const* rd = alert_cast(&(*a)); - ret = entryToJson(rd->m_lst); + if( rd ) { + ret = entryToJson(rd->m_lst); + } else { + // cast failed => dht_reply_data_done_alert => no data + } } {