dht_reply_data_done_alert to unlock client waiting for data

This commit is contained in:
Miguel Freitas 2013-08-07 22:47:34 -03:00
parent 3a1d772643
commit a35cdef738
8 changed files with 114 additions and 15 deletions

View File

@ -1163,6 +1163,27 @@ namespace libtorrent
entry::list_type const m_lst;
};
struct TORRENT_EXPORT dht_reply_data_done_alert: alert
{
dht_reply_data_done_alert(std::string const &username,
std::string const &resource, bool multi,
bool is_neighbor, bool got_data)
: m_username(username), m_resource(resource), m_multi(multi),
m_is_neighbor(is_neighbor), m_got_data(got_data)
{}
TORRENT_DEFINE_ALERT(dht_reply_data_done_alert);
const static int static_category = alert::dht_notification;
virtual std::string message() const;
std::string const m_username;
std::string const m_resource;
bool m_multi;
bool m_is_neighbor;
bool m_got_data;
};
struct TORRENT_EXPORT dht_get_data_alert: alert
{
dht_get_data_alert(entry const& target, bool possiblyNeighbor, bool hasData)

View File

@ -98,7 +98,8 @@ namespace libtorrent { namespace dht
int timeutc, int seq);
void getData(std::string const &username, std::string const &resource, bool multi,
boost::function<void(entry::list_type const&)> f);
boost::function<void(entry::list_type const&)> fdata,
boost::function<void(bool, bool)> fdone);
void dht_status(session_status& s);
void network_stats(int& sent, int& received);

View File

@ -253,7 +253,8 @@ public:
int timeutc, int seq);
void getData(std::string const &username, std::string const &resource, bool multi,
boost::function<void(entry::list_type const&)> f);
boost::function<void(entry::list_type const&)> fdata,
boost::function<void(bool, bool)> fdone);
bool verify_token(std::string const& token, char const* info_hash
, udp::endpoint const& addr);

View File

@ -358,6 +358,13 @@ namespace libtorrent {
return msg;
}
std::string dht_reply_data_done_alert::message() const
{
char msg[200];
snprintf(msg, sizeof(msg), "reply of getData done");
return msg;
}
std::string dht_get_data_alert::message() const
{
char msg[200];

View File

@ -430,9 +430,10 @@ namespace libtorrent { namespace dht
}
void dht_tracker::getData(std::string const &username, std::string const &resource, bool multi,
boost::function<void(entry::list_type const&)> f)
boost::function<void(entry::list_type const&)> fdata,
boost::function<void(bool, bool)> fdone)
{
m_dht.getData(username, resource, multi, f);
m_dht.getData(username, resource, multi, fdata, fdone);
}

View File

@ -365,6 +365,14 @@ namespace
node.m_rpc.invoke(e, i->first.ep(), o);
}
}
void getDataDone_fun(std::vector<std::pair<node_entry, std::string> > const& node_results,
bool got_data, node_impl& node,
boost::function<void(bool, bool)> fdone)
{
bool is_neighbor = false;
fdone(is_neighbor, got_data);
}
}
void node_impl::add_router_node(udp::endpoint router)
@ -431,15 +439,17 @@ void node_impl::putData(std::string const &username, std::string const &resource
}
void node_impl::getData(std::string const &username, std::string const &resource, bool multi,
boost::function<void(entry::list_type const&)> f)
boost::function<void(entry::list_type const&)> fdata,
boost::function<void(bool, bool)> fdone)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(node) << "getData [ username: " << info_hash << " res: " << resource << " ]" ;
#endif
// search for nodes with ids close to id or with peers
// for info-hash id. callback is used to return data.
boost::intrusive_ptr<dht_get> ta(new dht_get(*this, username, resource, multi, f,
boost::bind(&nop), false));
boost::intrusive_ptr<dht_get> ta(new dht_get(*this, username, resource, multi,
fdata,
boost::bind(&getDataDone_fun, _1, _2, boost::ref(*this), fdone), false));
ta->start();
}

View File

@ -5748,9 +5748,22 @@ retry:
}
}
void getDataDone_fun(aux::session_impl *si,
std::string const &username, std::string const &resource, bool multi,
bool is_neighbor, bool got_data)
{
if( si->m_alerts.should_post<dht_reply_data_done_alert>() ) {
si->m_alerts.post_alert(
dht_reply_data_done_alert(username, resource, multi,
is_neighbor, got_data));
}
}
void session_impl::dht_getData(std::string const &username, std::string const &resource, bool multi)
{
if (m_dht) m_dht->getData(username, resource, multi, boost::bind( post_dht_getData, this, _1));
if (m_dht) m_dht->getData(username, resource, multi,
boost::bind( post_dht_getData, this, _1),
boost::bind( getDataDone_fun, this, username, resource, multi, _1, _2));
}
void session_impl::on_dht_router_name_lookup(error_code const& e

View File

@ -218,6 +218,8 @@ void ThreadMaintainDHTNodes()
void ThreadSessionAlerts()
{
static map<uint256, entry> neighborCheck;
while(!ses) {
MilliSleep(200);
}
@ -271,14 +273,53 @@ void ThreadSessionAlerts()
if (gd)
{
if( gd->m_possiblyNeighbor ) {
printf("possiblyNeighbor of [%s,%s,%s]\n",
gd->m_target.find_key("n")->string().c_str(),
gd->m_target.find_key("r")->string().c_str(),
gd->m_target.find_key("t")->string().c_str());
entry const *n = gd->m_target.find_key("n");
entry const *r = gd->m_target.find_key("r");
entry const *t = gd->m_target.find_key("t");
if( n && n->type() == entry::string_t &&
r && r->type() == entry::string_t &&
t && t->type() == entry::string_t) {
// now we do our own search to make sure we are really close to this target
uint256 th = dhtTargetHash(n->string(), r->string(), t->string());
if( !neighborCheck.count(th) ) {
printf("possiblyNeighbor of [%s,%s,%s]\n",
n->string().c_str(),
r->string().c_str(),
t->string().c_str());
neighborCheck[th] = gd->m_target;
ses->dht_getData(n->string(), r->string(), t->string() == "m");
}
}
}
continue;
}
dht_reply_data_done_alert const* dd = alert_cast<dht_reply_data_done_alert>(*i);
if (dd)
{
printf("get_data_gone [%s,%s,%s] is_neighbor=%d got_data=%d\n",
dd->m_username.c_str(), dd->m_resource.c_str(), dd->m_multi ? "m" : "s",
dd->m_is_neighbor, dd->m_got_data);
uint256 th = dhtTargetHash(dd->m_username, dd->m_resource, dd->m_multi ? "m" : "s");
{
LOCK(cs_dhtgetMap);
std::map<uint256, alert_manager*>::iterator mi = m_dhtgetMap.find(th);
if( mi != m_dhtgetMap.end() && !dd->m_got_data ) {
// post alert to return from wait_for_alert in dhtget()
alert_manager *am = (*mi).second;
am->post_alert(*dd);
}
}
continue;
}
/*
save_resume_data_alert const* rd = alert_cast<save_resume_data_alert>(*i);
@ -500,13 +541,17 @@ Value dhtget(const Array& params, bool fHelp)
ses->dht_getData(strUsername, strResource, multi);
Value ret;
Value ret = Array();
if( am.wait_for_alert(seconds(10)) ) {
if( am.wait_for_alert(seconds(20)) ) {
std::auto_ptr<alert> a(am.get());
dht_reply_data_alert const* rd = alert_cast<dht_reply_data_alert>(&(*a));
ret = entryToJson(rd->m_lst);
if( rd ) {
ret = entryToJson(rd->m_lst);
} else {
// cast failed => dht_reply_data_done_alert => no data
}
}
{