From 3b0633fad454505b5a1b5d7e3914f6fdb6b0bbbf Mon Sep 17 00:00:00 2001 From: Miguel Freitas Date: Sat, 19 Jul 2014 12:38:04 -0300 Subject: [PATCH] global dht traffic seems too high: implement probabilistic refresh for users that post too much. this should reduce the traffic for now but me may want to tweak these settings later. the most recent posts are priorized regardless of how much the user posts. --- .../include/libtorrent/kademlia/node.hpp | 5 +- libtorrent/src/kademlia/node.cpp | 102 +++++++++++++----- src/clientversion.h | 2 +- 3 files changed, 78 insertions(+), 31 deletions(-) diff --git a/libtorrent/include/libtorrent/kademlia/node.hpp b/libtorrent/include/libtorrent/kademlia/node.hpp index cfedcb3f..56a2bb5d 100644 --- a/libtorrent/include/libtorrent/kademlia/node.hpp +++ b/libtorrent/include/libtorrent/kademlia/node.hpp @@ -177,6 +177,7 @@ class TORRENT_EXTRA_EXPORT node_impl : boost::noncopyable typedef std::map table_t; typedef std::list dht_storage_list_t; typedef std::map dht_storage_table_t; +typedef std::map< std::string, std::pair > dht_posts_by_user_t; // total known, latest known public: node_impl(alert_dispatcher* alert_disp, udp_socket_interface* sock @@ -287,8 +288,9 @@ private: std::set m_running_requests; void incoming_request(msg const& h, entry& e); - void store_dht_item(dht_storage_item &item, big_number const &target, + bool store_dht_item(dht_storage_item &item, big_number const &target, bool multi, int seq, int height, std::pair &bufv); + void process_newly_stored_entry(const lazy_entry &p); node_id m_id; @@ -299,6 +301,7 @@ public: private: table_t m_map; dht_storage_table_t m_storage_table; + dht_posts_by_user_t m_posts_by_user; ptime m_last_tracker_tick; ptime m_next_storage_refresh; diff --git a/libtorrent/src/kademlia/node.cpp b/libtorrent/src/kademlia/node.cpp index baeae422..2fb8980b 100644 --- a/libtorrent/src/kademlia/node.cpp +++ b/libtorrent/src/kademlia/node.cpp @@ -59,7 +59,6 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/rsa.hpp" #include "../../src/twister.h" -#define ENABLE_DHT_ITEM_EXPIRE /* refresh dht itens we know by putting them to other peers every 60 minutes. * this period must be small enough to ensure persistency and big enough to @@ -122,6 +121,8 @@ node_impl::node_impl(alert_dispatcher* alert_disp , m_id(nid == (node_id::min)() || !verify_id(nid, external_address) ? generate_id(external_address) : nid) , m_table(m_id, 8, settings) , m_rpc(m_id, m_table, sock, observer) + , m_storage_table() + , m_posts_by_user() , m_last_tracker_tick(time_now()) , m_next_storage_refresh(time_now()) , m_post_alert(alert_disp) @@ -517,7 +518,11 @@ void node_impl::putDataSigned(std::string const &username, std::string const &re int seq = (seqEntry && seqEntry->type() == entry::int_t) ? seqEntry->integer() : -1; int height = heightEntry->integer(); - store_dht_item(item, ta->target(), multi, seq, height, bufv); + if( store_dht_item(item, ta->target(), multi, seq, height, bufv) ) { + // local items not yet processed for hashtags and post counts + // not that bad - but we may eventually want to implement this + //process_newly_stored_entry(p); + } } // now send it to the network (start transversal algorithm) @@ -555,11 +560,17 @@ void node_impl::tick() } } -static void processEntryForHashtags(lazy_entry &p) +void node_impl::process_newly_stored_entry(const lazy_entry &p) { const lazy_entry *target = p.dict_find_dict("target"); - bool multi = (target && target->dict_find_string_value("t") == "m"); + if( !target ) + return; + std::string username = target->dict_find_string_value("n"); + std::string resource = target->dict_find_string_value("r"); + bool multi = (target->dict_find_string_value("t") == "m"); + + // update hashtags stats const lazy_entry *v = p.dict_find_dict("v"); if( v && !multi ) { const lazy_entry *userpost = v->dict_find_dict("userpost"); @@ -577,6 +588,15 @@ static void processEntryForHashtags(lazy_entry &p) } } } + + // update posts stats + std::string resourcePost("post"); + if( resource.compare(0, resourcePost.length(), resourcePost) == 0 ) { + int resourceNumber = atoi( resource.c_str() + resourcePost.length() ); + std::pair &userStats = m_posts_by_user[username]; + userStats.first++; //total + userStats.second = std::max(userStats.second, resourceNumber); + } } bool node_impl::refresh_storage() { @@ -593,19 +613,22 @@ bool node_impl::refresh_storage() { for(int jIdx = 0; j != jEnd; ++j, ++jIdx ) { dht_storage_item& item = *j; -#ifdef ENABLE_DHT_ITEM_EXPIRE if( has_expired(item, true) ) { continue; } -#endif if( item.next_refresh_time > now ) { + // this item won't be refreshed this time, + // but it may shorten sleep time to next refresh if( m_next_storage_refresh > item.next_refresh_time ) { m_next_storage_refresh = item.next_refresh_time; } continue; } + bool skip = false; + bool local_and_recent = (item.local_add_time && item.local_add_time + 60*60*24*2 > time(NULL)); + lazy_entry p; int pos; error_code err; @@ -614,27 +637,46 @@ bool node_impl::refresh_storage() { int height = p.dict_find_int_value("height"); if( height > getBestHeight() ) { - continue; // how? + skip = true; // invalid? how come? } const lazy_entry *target = p.dict_find_dict("target"); + if( !target ) + continue; std::string username = target->dict_find_string_value("n"); std::string resource = target->dict_find_string_value("r"); bool multi = (target->dict_find_string_value("t") == "m"); - // refresh only signed single posts and mentions - if( !multi || (multi && resource == "mention") || - (multi && item.local_add_time && item.local_add_time + 60*60*24*2 > time(NULL)) ) { + // probabilistic refresh for users that post a lot. + // note: we don't know the true total number of posts by user, but + // rather just what we have stored and still not expired. + std::string resourcePost("post"); + if( resource.compare(0, resourcePost.length(), resourcePost) == 0 ) { + int resourceNumber = atoi( resource.c_str() + resourcePost.length() ); + std::pair &userStats = m_posts_by_user[username]; + int knownPosts = userStats.first; + int lastPost = userStats.second; +#ifdef TORRENT_DHT_VERBOSE_LOGGING + printf("node dht: probabilistic post refresh for user: %s (total: %d last: %d cur: %d)\n", + username.c_str(), knownPosts, lastPost, resourceNumber); +#endif + if( resourceNumber < lastPost - 100 && knownPosts > 25 ) { + double p = 25. / (knownPosts - 25); + if( getRandom() > p ) { + skip = true; + } + } + } + // refresh only signed single posts and mentions + if( !skip && + (!multi || resource == "mention" || local_and_recent) ) { #ifdef TORRENT_DHT_VERBOSE_LOGGING printf("node dht: refreshing storage: [%s,%s,%s]\n", username.c_str(), resource.c_str(), target->dict_find_string_value("t").c_str()); #endif - - processEntryForHashtags(p); - entry entryP; entryP = p; // lazy to non-lazy @@ -646,21 +688,20 @@ bool node_impl::refresh_storage() { entryP, item.sig_p, item.sig_user), item.confirmed)); ta->start(); did_something = true; - - item.next_refresh_time = getNextRefreshTime(item.confirmed); } + // we are supposed to have refreshed this item by now (but we may have not - see above) + // so regardless of what we actually did, calculate when the next refresh is due + item.next_refresh_time = getNextRefreshTime(item.confirmed); if( m_next_storage_refresh > item.next_refresh_time ) { m_next_storage_refresh = item.next_refresh_time; } } } - /* - printf("node dht: next storage refresh in %s\n", - boost::posix_time::to_simple_string(sleepToRefresh).c_str() ); + printf("node dht: next storage refresh in %d\n", + m_next_storage_refresh - now ); */ - return did_something; } @@ -781,26 +822,21 @@ void node_impl::load_storage(entry const* e) { item.confirmed = (confirmed->integer() != 0); } - // just for printf for now bool expired = has_expired(item); -#ifdef ENABLE_DHT_ITEM_EXPIRE if( !expired ) { -#endif lazy_entry p; int pos; error_code err; // FIXME: optimize to avoid bdecode (store seq separated, etc) int ret = lazy_bdecode(item.p.data(), item.p.data() + item.p.size(), p, err, &pos, 10, 500); - processEntryForHashtags(p); + process_newly_stored_entry(p); // wait 1 minute (to load torrents, etc.) // randomize refresh time item.next_refresh_time = now + minutes(1) + refresh_interval * getRandom(); to_add.push_back(item); -#ifdef ENABLE_DHT_ITEM_EXPIRE } -#endif } m_storage_table.insert(std::make_pair(target, to_add)); } @@ -1409,8 +1445,10 @@ void node_impl::incoming_request(msg const& m, entry& e) dht_storage_item item(str_p, msg_keys[mk_sig_p], msg_keys[mk_sig_user]); std::pair bufv = msg_keys[mk_v]->data_section(); - store_dht_item(item, target, multi, !multi ? msg_keys[mk_seq]->int_value() : 0, - msg_keys[mk_height]->int_value(), bufv); + if( store_dht_item(item, target, multi, !multi ? msg_keys[mk_seq]->int_value() : 0, + msg_keys[mk_height]->int_value(), bufv) ) { + process_newly_stored_entry(*msg_keys[mk_p]); + } } else if (strcmp(query, "getData") == 0) { @@ -1536,9 +1574,11 @@ void node_impl::incoming_request(msg const& m, entry& e) } } -void node_impl::store_dht_item(dht_storage_item &item, const big_number &target, +bool node_impl::store_dht_item(dht_storage_item &item, const big_number &target, bool multi, int seq, int height, std::pair &bufv) { + bool stored = false; + item.next_refresh_time = getNextRefreshTime(item.confirmed); if( m_next_storage_refresh > item.next_refresh_time ) { m_next_storage_refresh = item.next_refresh_time; @@ -1557,6 +1597,7 @@ void node_impl::store_dht_item(dht_storage_item &item, const big_number &target, boost::tie(i, boost::tuples::ignore) = m_storage_table.insert( std::make_pair(target, to_add)); + stored = true; } else { dht_storage_list_t & lsto = i->second; @@ -1574,10 +1615,11 @@ void node_impl::store_dht_item(dht_storage_item &item, const big_number &target, if( !multi ) { if( seq > p.dict_find_int("seq")->int_value() ) { olditem = item; + stored = true; } else { // don't report this error (because of refresh storage) //incoming_error(e, "old sequence number"); - return; + break; } } else { // compare contents before adding to the list @@ -1597,12 +1639,14 @@ void node_impl::store_dht_item(dht_storage_item &item, const big_number &target, if(multi && j == rend) { // new entry lsto.insert(insert_pos, item); + stored = true; } if(lsto.size() > m_settings.max_entries_per_multi) { lsto.resize(m_settings.max_entries_per_multi); } } + return stored; } } } // namespace libtorrent::dht diff --git a/src/clientversion.h b/src/clientversion.h index b281df69..f551af85 100644 --- a/src/clientversion.h +++ b/src/clientversion.h @@ -8,7 +8,7 @@ // These need to be macros, as version.cpp's and bitcoin-qt.rc's voodoo requires it #define CLIENT_VERSION_MAJOR 0 #define CLIENT_VERSION_MINOR 9 -#define CLIENT_VERSION_REVISION 23 +#define CLIENT_VERSION_REVISION 24 #define CLIENT_VERSION_BUILD 0 // Set to true for release, false for prerelease or test build