diff --git a/libtorrent/include/libtorrent/kademlia/node.hpp b/libtorrent/include/libtorrent/kademlia/node.hpp index 5d174084..9837cabb 100644 --- a/libtorrent/include/libtorrent/kademlia/node.hpp +++ b/libtorrent/include/libtorrent/kademlia/node.hpp @@ -52,6 +52,9 @@ POSSIBILITY OF SUCH DAMAGE. #include #include +#include +#include +#include #include "libtorrent/socket.hpp" @@ -118,18 +121,20 @@ struct torrent_entry struct dht_storage_item { // FIXME: optimize so bdecode is not needed all the time - dht_storage_item() : p(), sig_p(), sig_user(), local_add_time(0) {} + dht_storage_item() : p(), sig_p(), sig_user(), local_add_time(0), confirmed(false), next_refresh_time(0) {} dht_storage_item(std::string const &_p, lazy_entry const *_sig_p, lazy_entry const *_sig_user) : p(_p), sig_p(_sig_p->string_value()), sig_user(_sig_user->string_value()), - local_add_time(0) {} + local_add_time(0), confirmed(false), next_refresh_time(0) {} dht_storage_item(std::string const &_p, std::string const &_sig_p, std::string const &_sig_user) - : p(_p), sig_p(_sig_p), sig_user(_sig_user), local_add_time(0) {} + : p(_p), sig_p(_sig_p), sig_user(_sig_user), local_add_time(0), confirmed(false), next_refresh_time(0) {} std::string p; std::string sig_p; std::string sig_user; boost::int64_t local_add_time; // the last time we heard about this //ptime last_seen; + bool confirmed; + ptime next_refresh_time; }; @@ -300,14 +305,16 @@ private: ptime m_last_tracker_tick; ptime m_next_storage_refresh; - int m_refresh_per_tick; - std::pair m_last_refreshed_item; // secret random numbers used to create write tokens int m_secret[2]; alert_dispatcher* m_post_alert; udp_socket_interface* m_sock; + + boost::mt19937 m_random_seed; + boost::uniform_real<> m_random_dist; + boost::variate_generator > m_random; }; diff --git a/libtorrent/src/kademlia/node.cpp b/libtorrent/src/kademlia/node.cpp index 8712afb7..a08322fd 100644 --- a/libtorrent/src/kademlia/node.cpp +++ b/libtorrent/src/kademlia/node.cpp @@ -107,9 +107,10 @@ node_impl::node_impl(alert_dispatcher* alert_disp , m_rpc(m_id, m_table, sock, observer) , m_last_tracker_tick(time_now()) , m_next_storage_refresh(time_now()) - , m_last_refreshed_item() , m_post_alert(alert_disp) , m_sock(sock) + , m_random_dist(0.0, 1.0) + , m_random(m_random_seed, m_random_dist) { m_secret[0] = random(); m_secret[1] = std::rand(); @@ -308,6 +309,11 @@ namespace } } + void putData_confirm(dht_storage_item& item) + { + item.confirmed = true; + } + void putData_fun(std::vector > const& v, node_impl& node, entry const &p, std::string const &sig_p, std::string const &sig_user) @@ -535,22 +541,17 @@ static void processEntryForHashtags(lazy_entry &p) bool node_impl::refresh_storage() { bool did_something = false; - bool refresh_next_item = false; - int num_refreshable = 0; - for (dht_storage_table_t::const_iterator i = m_storage_table.begin(), + ptime const now = time_now(); + m_next_storage_refresh = now + minutes(10); + + for (dht_storage_table_t::iterator i = m_storage_table.begin(), end(m_storage_table.end()); i != end; ++i ) { - dht_storage_list_t const& lsto = i->second; - dht_storage_list_t::const_iterator j(lsto.begin()), jEnd(lsto.end()); + dht_storage_list_t& lsto = i->second; + dht_storage_list_t::iterator j(lsto.begin()), jEnd(lsto.end()); for(int jIdx = 0; j != jEnd; ++j, ++jIdx ) { - dht_storage_item const& item = *j; - - if( std::make_pair(i->first,jIdx) == m_last_refreshed_item ) { - refresh_next_item = true; - num_refreshable++; - continue; - } + dht_storage_item& item = *j; #ifdef ENABLE_DHT_ITEM_EXPIRE if( has_expired(item, true) ) { @@ -558,6 +559,13 @@ bool node_impl::refresh_storage() { } #endif + if( item.next_refresh_time > now ) { + if( m_next_storage_refresh > item.next_refresh_time ) { + m_next_storage_refresh = item.next_refresh_time; + } + continue; + } + lazy_entry p; int pos; error_code err; @@ -577,11 +585,7 @@ bool node_impl::refresh_storage() { // refresh only signed single posts and mentions if( !multi || (multi && resource == "mention") || (multi && item.local_add_time && item.local_add_time + 60*60*24*2 > time(NULL)) ) { - num_refreshable++; - if( refresh_next_item && m_refresh_per_tick ) { - --m_refresh_per_tick; - m_last_refreshed_item = std::make_pair(i->first,jIdx); #ifdef TORRENT_DHT_VERBOSE_LOGGING printf("node dht: refreshing storage: [%s,%s,%s]\n", username.c_str(), @@ -589,39 +593,29 @@ bool node_impl::refresh_storage() { target->dict_find_string_value("t").c_str()); #endif - processEntryForHashtags(p); - - entry entryP; - entryP = p; // lazy to non-lazy + processEntryForHashtags(p); - // search for nodes with ids close to id or with peers - // for info-hash id. then send putData to them. - boost::intrusive_ptr ta(new dht_get(*this, username, resource, multi, - boost::bind(&nop), - boost::bind(&putData_fun, _1, boost::ref(*this), - entryP, item.sig_p, item.sig_user), true)); - ta->start(); - did_something = true; + entry entryP; + entryP = p; // lazy to non-lazy + + // search for nodes with ids close to id or with peers + // for info-hash id. then send putData to them. + boost::intrusive_ptr ta(new dht_get(*this, username, resource, multi, + boost::bind(&putData_confirm, item), + boost::bind(&putData_fun, _1, boost::ref(*this), + entryP, item.sig_p, item.sig_user), !item.confirmed)); + ta->start(); + did_something = true; + + // add 10% diffusion to next refresh time + item.next_refresh_time = now + minutes(item.confirmed ? 60 : 1) * (1. + 0.1 * (2. * m_random() - 1.)); + if( m_next_storage_refresh > item.next_refresh_time ) { + m_next_storage_refresh = item.next_refresh_time; } } } } - if( !did_something && m_storage_table.size() ) { - m_last_refreshed_item = std::make_pair(m_storage_table.begin()->first,0); - } - - const time_duration tickInterval = seconds(5); - const time_duration fullRefreshInterval = minutes(30); - const time_duration sleepInterval = minutes(10); - const time_duration sleepToRefresh = std::min( sleepInterval, fullRefreshInterval / (num_refreshable ? num_refreshable : 1) ); - m_next_storage_refresh = time_now() + sleepToRefresh; - if( sleepToRefresh > tickInterval ) { - m_refresh_per_tick = 1; - } else { - m_refresh_per_tick = tickInterval.diff/sleepToRefresh.diff; - } - /* printf("node dht: next storage refresh in %s\n", boost::posix_time::to_simple_string(sleepToRefresh).c_str() ); @@ -704,6 +698,8 @@ bool node_impl::save_storage(entry &save) const { entry_item["sig_user"] = item.sig_user; if( item.local_add_time ) entry_item["local_add_time"] = item.local_add_time; + if( item.confirmed ) + entry_item["confirmed"] = item.confirmed ? 0 : 1; save_list.list().push_back(entry_item); } } @@ -720,6 +716,8 @@ void node_impl::load_storage(entry const* e) { if( !e || e->type() != entry::dictionary_t) return; + ptime now = time_now(); + printf("node dht: loading storage... (%lu node_id keys)\n", e->dict().size()); for (entry::dictionary_type::const_iterator i = e->dict().begin(); @@ -738,6 +736,13 @@ void node_impl::load_storage(entry const* e) { entry const *local_add_time( j->find_key("local_add_time") ); if(local_add_time) item.local_add_time = local_add_time->integer(); + entry const *confirmed( j->find_key("confirmed") ); + if(confirmed) { + item.confirmed = (bool) confirmed->integer(); + } else { + // compatibility with twister 0.9.22 and below + item.confirmed = true; + } // just for printf for now bool expired = has_expired(item); @@ -751,6 +756,9 @@ void node_impl::load_storage(entry const* e) { int ret = lazy_bdecode(item.p.data(), item.p.data() + item.p.size(), p, err, &pos, 10, 500); processEntryForHashtags(p); + // randomize refresh time + item.next_refresh_time = now + minutes(item.confirmed ? 60 : 1) * m_random(); + to_add.push_back(item); #ifdef ENABLE_DHT_ITEM_EXPIRE } @@ -1493,6 +1501,7 @@ void node_impl::incoming_request(msg const& m, entry& e) void node_impl::store_dht_item(dht_storage_item &item, const big_number &target, bool multi, int seq, int height, std::pair &bufv) { + item.next_refresh_time = time_now() + minutes(item.confirmed ? 60 : 1); dht_storage_table_t::iterator i = m_storage_table.find(target); if (i == m_storage_table.end()) { // make sure we don't add too many items