Browse Source

Merge pull request #235 from dryabov/dht_refresh_1

faster DHT refresh 2
miguelfreitas
miguelfreitas 11 years ago
parent
commit
8ffcef4a5c
  1. 10
      libtorrent/include/libtorrent/kademlia/node.hpp
  2. 2
      libtorrent/include/libtorrent/ptime.hpp
  3. 104
      libtorrent/src/kademlia/node.cpp

10
libtorrent/include/libtorrent/kademlia/node.hpp

@ -118,18 +118,20 @@ struct torrent_entry
struct dht_storage_item struct dht_storage_item
{ {
// FIXME: optimize so bdecode is not needed all the time // FIXME: optimize so bdecode is not needed all the time
dht_storage_item() : p(), sig_p(), sig_user(), local_add_time(0) {} dht_storage_item() : p(), sig_p(), sig_user(), local_add_time(0), confirmed(true), next_refresh_time(0) {}
dht_storage_item(std::string const &_p, lazy_entry const *_sig_p, lazy_entry const *_sig_user) dht_storage_item(std::string const &_p, lazy_entry const *_sig_p, lazy_entry const *_sig_user)
: p(_p), sig_p(_sig_p->string_value()), sig_user(_sig_user->string_value()), : p(_p), sig_p(_sig_p->string_value()), sig_user(_sig_user->string_value()),
local_add_time(0) {} local_add_time(0), confirmed(true), next_refresh_time(0) {}
dht_storage_item(std::string const &_p, std::string const &_sig_p, std::string const &_sig_user) dht_storage_item(std::string const &_p, std::string const &_sig_p, std::string const &_sig_user)
: p(_p), sig_p(_sig_p), sig_user(_sig_user), local_add_time(0) {} : p(_p), sig_p(_sig_p), sig_user(_sig_user), local_add_time(0), confirmed(true), next_refresh_time(0) {}
std::string p; std::string p;
std::string sig_p; std::string sig_p;
std::string sig_user; std::string sig_user;
boost::int64_t local_add_time; boost::int64_t local_add_time;
// the last time we heard about this // the last time we heard about this
//ptime last_seen; //ptime last_seen;
bool confirmed;
ptime next_refresh_time;
}; };
@ -300,8 +302,6 @@ private:
ptime m_last_tracker_tick; ptime m_last_tracker_tick;
ptime m_next_storage_refresh; ptime m_next_storage_refresh;
int m_refresh_per_tick;
std::pair<node_id, int> m_last_refreshed_item;
// secret random numbers used to create write tokens // secret random numbers used to create write tokens
int m_secret[2]; int m_secret[2];

2
libtorrent/include/libtorrent/ptime.hpp

@ -72,6 +72,8 @@ namespace libtorrent
{ return time_duration(diff + c.diff); } { return time_duration(diff + c.diff); }
time_duration operator-(time_duration const& c) time_duration operator-(time_duration const& c)
{ return time_duration(diff - c.diff); } { return time_duration(diff - c.diff); }
time_duration operator*(const double rhs) const
{ return time_duration( boost::int64_t (diff * rhs) ); }
// internal // internal
boost::int64_t diff; boost::int64_t diff;

104
libtorrent/src/kademlia/node.cpp

@ -36,6 +36,9 @@ POSSIBILITY OF SUCH DAMAGE.
#include <boost/bind.hpp> #include <boost/bind.hpp>
#include <boost/function/function1.hpp> #include <boost/function/function1.hpp>
//#include <boost/date_time/posix_time/time_formatters_limited.hpp> //#include <boost/date_time/posix_time/time_formatters_limited.hpp>
#include <boost/random.hpp>
#include <boost/nondet_random.hpp>
#include <boost/random/mersenne_twister.hpp>
#include "libtorrent/io.hpp" #include "libtorrent/io.hpp"
#include "libtorrent/bencode.hpp" #include "libtorrent/bencode.hpp"
@ -107,7 +110,6 @@ node_impl::node_impl(alert_dispatcher* alert_disp
, m_rpc(m_id, m_table, sock, observer) , m_rpc(m_id, m_table, sock, observer)
, m_last_tracker_tick(time_now()) , m_last_tracker_tick(time_now())
, m_next_storage_refresh(time_now()) , m_next_storage_refresh(time_now())
, m_last_refreshed_item()
, m_post_alert(alert_disp) , m_post_alert(alert_disp)
, m_sock(sock) , m_sock(sock)
{ {
@ -308,6 +310,34 @@ namespace
} }
} }
double getRandom()
{
static boost::mt19937 m_random_seed;
static boost::uniform_real<double> m_random_dist(0.0, 1.0);
static boost::variate_generator<boost::mt19937&, boost::uniform_real<double> > m_random(m_random_seed, m_random_dist);
return m_random();
}
ptime getNextRefreshTime(bool confirmed = true)
{
static ptime nextRefreshTime[2] = { ptime(0) };
nextRefreshTime[confirmed] = std::max(
nextRefreshTime[confirmed] + milliseconds(500),
// add +/-10% diffusion to next refresh time
time_now() + minutes(confirmed ? 60 : 1) * ( 0.9 + 0.2 * getRandom() )
);
return nextRefreshTime[confirmed];
}
void putData_confirm(entry::list_type const& values_list, dht_storage_item& item)
{
if( !item.confirmed && !values_list.empty() ) {
item.confirmed = true;
item.next_refresh_time = getNextRefreshTime();
}
}
void putData_fun(std::vector<std::pair<node_entry, std::string> > const& v, void putData_fun(std::vector<std::pair<node_entry, std::string> > const& v,
node_impl& node, node_impl& node,
entry const &p, std::string const &sig_p, std::string const &sig_user) entry const &p, std::string const &sig_p, std::string const &sig_user)
@ -465,6 +495,7 @@ void node_impl::putDataSigned(std::string const &username, std::string const &re
dht_storage_item item(str_p, sig_p, sig_user); dht_storage_item item(str_p, sig_p, sig_user);
item.local_add_time = time(NULL); item.local_add_time = time(NULL);
item.confirmed = false;
std::vector<char> vbuf; std::vector<char> vbuf;
bencode(std::back_inserter(vbuf), p["v"]); bencode(std::back_inserter(vbuf), p["v"]);
std::pair<char const*, int> bufv = std::make_pair(vbuf.data(), vbuf.size()); std::pair<char const*, int> bufv = std::make_pair(vbuf.data(), vbuf.size());
@ -535,22 +566,17 @@ static void processEntryForHashtags(lazy_entry &p)
bool node_impl::refresh_storage() { bool node_impl::refresh_storage() {
bool did_something = false; bool did_something = false;
bool refresh_next_item = false;
int num_refreshable = 0;
for (dht_storage_table_t::const_iterator i = m_storage_table.begin(), ptime const now = time_now();
m_next_storage_refresh = now + minutes(60);
for (dht_storage_table_t::iterator i = m_storage_table.begin(),
end(m_storage_table.end()); i != end; ++i ) end(m_storage_table.end()); i != end; ++i )
{ {
dht_storage_list_t const& lsto = i->second; dht_storage_list_t& lsto = i->second;
dht_storage_list_t::const_iterator j(lsto.begin()), jEnd(lsto.end()); dht_storage_list_t::iterator j(lsto.begin()), jEnd(lsto.end());
for(int jIdx = 0; j != jEnd; ++j, ++jIdx ) { for(int jIdx = 0; j != jEnd; ++j, ++jIdx ) {
dht_storage_item const& item = *j; dht_storage_item& item = *j;
if( std::make_pair(i->first,jIdx) == m_last_refreshed_item ) {
refresh_next_item = true;
num_refreshable++;
continue;
}
#ifdef ENABLE_DHT_ITEM_EXPIRE #ifdef ENABLE_DHT_ITEM_EXPIRE
if( has_expired(item, true) ) { if( has_expired(item, true) ) {
@ -558,6 +584,13 @@ bool node_impl::refresh_storage() {
} }
#endif #endif
if( item.next_refresh_time > now ) {
if( m_next_storage_refresh > item.next_refresh_time ) {
m_next_storage_refresh = item.next_refresh_time;
}
continue;
}
lazy_entry p; lazy_entry p;
int pos; int pos;
error_code err; error_code err;
@ -577,11 +610,7 @@ bool node_impl::refresh_storage() {
// refresh only signed single posts and mentions // refresh only signed single posts and mentions
if( !multi || (multi && resource == "mention") || if( !multi || (multi && resource == "mention") ||
(multi && item.local_add_time && item.local_add_time + 60*60*24*2 > time(NULL)) ) { (multi && item.local_add_time && item.local_add_time + 60*60*24*2 > time(NULL)) ) {
num_refreshable++;
if( refresh_next_item && m_refresh_per_tick ) {
--m_refresh_per_tick;
m_last_refreshed_item = std::make_pair(i->first,jIdx);
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
printf("node dht: refreshing storage: [%s,%s,%s]\n", printf("node dht: refreshing storage: [%s,%s,%s]\n",
username.c_str(), username.c_str(),
@ -597,29 +626,19 @@ bool node_impl::refresh_storage() {
// search for nodes with ids close to id or with peers // search for nodes with ids close to id or with peers
// for info-hash id. then send putData to them. // for info-hash id. then send putData to them.
boost::intrusive_ptr<dht_get> ta(new dht_get(*this, username, resource, multi, boost::intrusive_ptr<dht_get> ta(new dht_get(*this, username, resource, multi,
boost::bind(&nop), boost::bind(&putData_confirm, _1, boost::ref(item)),
boost::bind(&putData_fun, _1, boost::ref(*this), boost::bind(&putData_fun, _1, boost::ref(*this),
entryP, item.sig_p, item.sig_user), true)); entryP, item.sig_p, item.sig_user), item.confirmed));
ta->start(); ta->start();
did_something = true; did_something = true;
}
}
}
}
if( !did_something && m_storage_table.size() ) { item.next_refresh_time = getNextRefreshTime(item.confirmed);
m_last_refreshed_item = std::make_pair(m_storage_table.begin()->first,0);
} }
const time_duration tickInterval = seconds(5); if( m_next_storage_refresh > item.next_refresh_time ) {
const time_duration fullRefreshInterval = minutes(30); m_next_storage_refresh = item.next_refresh_time;
const time_duration sleepInterval = minutes(10); }
const time_duration sleepToRefresh = std::min( sleepInterval, fullRefreshInterval / (num_refreshable ? num_refreshable : 1) ); }
m_next_storage_refresh = time_now() + sleepToRefresh;
if( sleepToRefresh > tickInterval ) {
m_refresh_per_tick = 1;
} else {
m_refresh_per_tick = tickInterval.diff/sleepToRefresh.diff;
} }
/* /*
@ -704,6 +723,7 @@ bool node_impl::save_storage(entry &save) const {
entry_item["sig_user"] = item.sig_user; entry_item["sig_user"] = item.sig_user;
if( item.local_add_time ) if( item.local_add_time )
entry_item["local_add_time"] = item.local_add_time; entry_item["local_add_time"] = item.local_add_time;
entry_item["confirmed"] = item.confirmed ? 1 : 0;
save_list.list().push_back(entry_item); save_list.list().push_back(entry_item);
} }
} }
@ -720,6 +740,9 @@ void node_impl::load_storage(entry const* e) {
if( !e || e->type() != entry::dictionary_t) if( !e || e->type() != entry::dictionary_t)
return; return;
ptime const now = time_now();
time_duration const refresh_interval = std::max( minutes(60), e->dict().size() * milliseconds(500) );
printf("node dht: loading storage... (%lu node_id keys)\n", e->dict().size()); printf("node dht: loading storage... (%lu node_id keys)\n", e->dict().size());
for (entry::dictionary_type::const_iterator i = e->dict().begin(); for (entry::dictionary_type::const_iterator i = e->dict().begin();
@ -738,6 +761,10 @@ void node_impl::load_storage(entry const* e) {
entry const *local_add_time( j->find_key("local_add_time") ); entry const *local_add_time( j->find_key("local_add_time") );
if(local_add_time) if(local_add_time)
item.local_add_time = local_add_time->integer(); item.local_add_time = local_add_time->integer();
entry const *confirmed( j->find_key("confirmed") );
if(confirmed) {
item.confirmed = (confirmed->integer() != 0);
}
// just for printf for now // just for printf for now
bool expired = has_expired(item); bool expired = has_expired(item);
@ -751,6 +778,10 @@ void node_impl::load_storage(entry const* e) {
int ret = lazy_bdecode(item.p.data(), item.p.data() + item.p.size(), p, err, &pos, 10, 500); int ret = lazy_bdecode(item.p.data(), item.p.data() + item.p.size(), p, err, &pos, 10, 500);
processEntryForHashtags(p); processEntryForHashtags(p);
// wait 1 minute (to load torrents, etc.)
// randomize refresh time
item.next_refresh_time = now + minutes(1) + refresh_interval * getRandom();
to_add.push_back(item); to_add.push_back(item);
#ifdef ENABLE_DHT_ITEM_EXPIRE #ifdef ENABLE_DHT_ITEM_EXPIRE
} }
@ -1493,6 +1524,11 @@ void node_impl::incoming_request(msg const& m, entry& e)
void node_impl::store_dht_item(dht_storage_item &item, const big_number &target, void node_impl::store_dht_item(dht_storage_item &item, const big_number &target,
bool multi, int seq, int height, std::pair<char const*, int> &bufv) bool multi, int seq, int height, std::pair<char const*, int> &bufv)
{ {
item.next_refresh_time = getNextRefreshTime(item.confirmed);
if( m_next_storage_refresh > item.next_refresh_time ) {
m_next_storage_refresh = item.next_refresh_time;
}
dht_storage_table_t::iterator i = m_storage_table.find(target); dht_storage_table_t::iterator i = m_storage_table.find(target);
if (i == m_storage_table.end()) { if (i == m_storage_table.end()) {
// make sure we don't add too many items // make sure we don't add too many items

Loading…
Cancel
Save