diff --git a/libtorrent/include/libtorrent/kademlia/node.hpp b/libtorrent/include/libtorrent/kademlia/node.hpp index cfedcb3f..56a2bb5d 100644 --- a/libtorrent/include/libtorrent/kademlia/node.hpp +++ b/libtorrent/include/libtorrent/kademlia/node.hpp @@ -177,6 +177,7 @@ class TORRENT_EXTRA_EXPORT node_impl : boost::noncopyable typedef std::map table_t; typedef std::list dht_storage_list_t; typedef std::map dht_storage_table_t; +typedef std::map< std::string, std::pair > dht_posts_by_user_t; // total known, latest known public: node_impl(alert_dispatcher* alert_disp, udp_socket_interface* sock @@ -287,8 +288,9 @@ private: std::set m_running_requests; void incoming_request(msg const& h, entry& e); - void store_dht_item(dht_storage_item &item, big_number const &target, + bool store_dht_item(dht_storage_item &item, big_number const &target, bool multi, int seq, int height, std::pair &bufv); + void process_newly_stored_entry(const lazy_entry &p); node_id m_id; @@ -299,6 +301,7 @@ public: private: table_t m_map; dht_storage_table_t m_storage_table; + dht_posts_by_user_t m_posts_by_user; ptime m_last_tracker_tick; ptime m_next_storage_refresh; diff --git a/libtorrent/src/kademlia/node.cpp b/libtorrent/src/kademlia/node.cpp index baeae422..2fb8980b 100644 --- a/libtorrent/src/kademlia/node.cpp +++ b/libtorrent/src/kademlia/node.cpp @@ -59,7 +59,6 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/rsa.hpp" #include "../../src/twister.h" -#define ENABLE_DHT_ITEM_EXPIRE /* refresh dht itens we know by putting them to other peers every 60 minutes. * this period must be small enough to ensure persistency and big enough to @@ -122,6 +121,8 @@ node_impl::node_impl(alert_dispatcher* alert_disp , m_id(nid == (node_id::min)() || !verify_id(nid, external_address) ? generate_id(external_address) : nid) , m_table(m_id, 8, settings) , m_rpc(m_id, m_table, sock, observer) + , m_storage_table() + , m_posts_by_user() , m_last_tracker_tick(time_now()) , m_next_storage_refresh(time_now()) , m_post_alert(alert_disp) @@ -517,7 +518,11 @@ void node_impl::putDataSigned(std::string const &username, std::string const &re int seq = (seqEntry && seqEntry->type() == entry::int_t) ? seqEntry->integer() : -1; int height = heightEntry->integer(); - store_dht_item(item, ta->target(), multi, seq, height, bufv); + if( store_dht_item(item, ta->target(), multi, seq, height, bufv) ) { + // local items not yet processed for hashtags and post counts + // not that bad - but we may eventually want to implement this + //process_newly_stored_entry(p); + } } // now send it to the network (start transversal algorithm) @@ -555,11 +560,17 @@ void node_impl::tick() } } -static void processEntryForHashtags(lazy_entry &p) +void node_impl::process_newly_stored_entry(const lazy_entry &p) { const lazy_entry *target = p.dict_find_dict("target"); - bool multi = (target && target->dict_find_string_value("t") == "m"); + if( !target ) + return; + std::string username = target->dict_find_string_value("n"); + std::string resource = target->dict_find_string_value("r"); + bool multi = (target->dict_find_string_value("t") == "m"); + + // update hashtags stats const lazy_entry *v = p.dict_find_dict("v"); if( v && !multi ) { const lazy_entry *userpost = v->dict_find_dict("userpost"); @@ -577,6 +588,15 @@ static void processEntryForHashtags(lazy_entry &p) } } } + + // update posts stats + std::string resourcePost("post"); + if( resource.compare(0, resourcePost.length(), resourcePost) == 0 ) { + int resourceNumber = atoi( resource.c_str() + resourcePost.length() ); + std::pair &userStats = m_posts_by_user[username]; + userStats.first++; //total + userStats.second = std::max(userStats.second, resourceNumber); + } } bool node_impl::refresh_storage() { @@ -593,19 +613,22 @@ bool node_impl::refresh_storage() { for(int jIdx = 0; j != jEnd; ++j, ++jIdx ) { dht_storage_item& item = *j; -#ifdef ENABLE_DHT_ITEM_EXPIRE if( has_expired(item, true) ) { continue; } -#endif if( item.next_refresh_time > now ) { + // this item won't be refreshed this time, + // but it may shorten sleep time to next refresh if( m_next_storage_refresh > item.next_refresh_time ) { m_next_storage_refresh = item.next_refresh_time; } continue; } + bool skip = false; + bool local_and_recent = (item.local_add_time && item.local_add_time + 60*60*24*2 > time(NULL)); + lazy_entry p; int pos; error_code err; @@ -614,27 +637,46 @@ bool node_impl::refresh_storage() { int height = p.dict_find_int_value("height"); if( height > getBestHeight() ) { - continue; // how? + skip = true; // invalid? how come? } const lazy_entry *target = p.dict_find_dict("target"); + if( !target ) + continue; std::string username = target->dict_find_string_value("n"); std::string resource = target->dict_find_string_value("r"); bool multi = (target->dict_find_string_value("t") == "m"); - // refresh only signed single posts and mentions - if( !multi || (multi && resource == "mention") || - (multi && item.local_add_time && item.local_add_time + 60*60*24*2 > time(NULL)) ) { + // probabilistic refresh for users that post a lot. + // note: we don't know the true total number of posts by user, but + // rather just what we have stored and still not expired. + std::string resourcePost("post"); + if( resource.compare(0, resourcePost.length(), resourcePost) == 0 ) { + int resourceNumber = atoi( resource.c_str() + resourcePost.length() ); + std::pair &userStats = m_posts_by_user[username]; + int knownPosts = userStats.first; + int lastPost = userStats.second; +#ifdef TORRENT_DHT_VERBOSE_LOGGING + printf("node dht: probabilistic post refresh for user: %s (total: %d last: %d cur: %d)\n", + username.c_str(), knownPosts, lastPost, resourceNumber); +#endif + if( resourceNumber < lastPost - 100 && knownPosts > 25 ) { + double p = 25. / (knownPosts - 25); + if( getRandom() > p ) { + skip = true; + } + } + } + // refresh only signed single posts and mentions + if( !skip && + (!multi || resource == "mention" || local_and_recent) ) { #ifdef TORRENT_DHT_VERBOSE_LOGGING printf("node dht: refreshing storage: [%s,%s,%s]\n", username.c_str(), resource.c_str(), target->dict_find_string_value("t").c_str()); #endif - - processEntryForHashtags(p); - entry entryP; entryP = p; // lazy to non-lazy @@ -646,21 +688,20 @@ bool node_impl::refresh_storage() { entryP, item.sig_p, item.sig_user), item.confirmed)); ta->start(); did_something = true; - - item.next_refresh_time = getNextRefreshTime(item.confirmed); } + // we are supposed to have refreshed this item by now (but we may have not - see above) + // so regardless of what we actually did, calculate when the next refresh is due + item.next_refresh_time = getNextRefreshTime(item.confirmed); if( m_next_storage_refresh > item.next_refresh_time ) { m_next_storage_refresh = item.next_refresh_time; } } } - /* - printf("node dht: next storage refresh in %s\n", - boost::posix_time::to_simple_string(sleepToRefresh).c_str() ); + printf("node dht: next storage refresh in %d\n", + m_next_storage_refresh - now ); */ - return did_something; } @@ -781,26 +822,21 @@ void node_impl::load_storage(entry const* e) { item.confirmed = (confirmed->integer() != 0); } - // just for printf for now bool expired = has_expired(item); -#ifdef ENABLE_DHT_ITEM_EXPIRE if( !expired ) { -#endif lazy_entry p; int pos; error_code err; // FIXME: optimize to avoid bdecode (store seq separated, etc) int ret = lazy_bdecode(item.p.data(), item.p.data() + item.p.size(), p, err, &pos, 10, 500); - processEntryForHashtags(p); + process_newly_stored_entry(p); // wait 1 minute (to load torrents, etc.) // randomize refresh time item.next_refresh_time = now + minutes(1) + refresh_interval * getRandom(); to_add.push_back(item); -#ifdef ENABLE_DHT_ITEM_EXPIRE } -#endif } m_storage_table.insert(std::make_pair(target, to_add)); } @@ -1409,8 +1445,10 @@ void node_impl::incoming_request(msg const& m, entry& e) dht_storage_item item(str_p, msg_keys[mk_sig_p], msg_keys[mk_sig_user]); std::pair bufv = msg_keys[mk_v]->data_section(); - store_dht_item(item, target, multi, !multi ? msg_keys[mk_seq]->int_value() : 0, - msg_keys[mk_height]->int_value(), bufv); + if( store_dht_item(item, target, multi, !multi ? msg_keys[mk_seq]->int_value() : 0, + msg_keys[mk_height]->int_value(), bufv) ) { + process_newly_stored_entry(*msg_keys[mk_p]); + } } else if (strcmp(query, "getData") == 0) { @@ -1536,9 +1574,11 @@ void node_impl::incoming_request(msg const& m, entry& e) } } -void node_impl::store_dht_item(dht_storage_item &item, const big_number &target, +bool node_impl::store_dht_item(dht_storage_item &item, const big_number &target, bool multi, int seq, int height, std::pair &bufv) { + bool stored = false; + item.next_refresh_time = getNextRefreshTime(item.confirmed); if( m_next_storage_refresh > item.next_refresh_time ) { m_next_storage_refresh = item.next_refresh_time; @@ -1557,6 +1597,7 @@ void node_impl::store_dht_item(dht_storage_item &item, const big_number &target, boost::tie(i, boost::tuples::ignore) = m_storage_table.insert( std::make_pair(target, to_add)); + stored = true; } else { dht_storage_list_t & lsto = i->second; @@ -1574,10 +1615,11 @@ void node_impl::store_dht_item(dht_storage_item &item, const big_number &target, if( !multi ) { if( seq > p.dict_find_int("seq")->int_value() ) { olditem = item; + stored = true; } else { // don't report this error (because of refresh storage) //incoming_error(e, "old sequence number"); - return; + break; } } else { // compare contents before adding to the list @@ -1597,12 +1639,14 @@ void node_impl::store_dht_item(dht_storage_item &item, const big_number &target, if(multi && j == rend) { // new entry lsto.insert(insert_pos, item); + stored = true; } if(lsto.size() > m_settings.max_entries_per_multi) { lsto.resize(m_settings.max_entries_per_multi); } } + return stored; } } } // namespace libtorrent::dht diff --git a/src/clientversion.h b/src/clientversion.h index b281df69..f551af85 100644 --- a/src/clientversion.h +++ b/src/clientversion.h @@ -8,7 +8,7 @@ // These need to be macros, as version.cpp's and bitcoin-qt.rc's voodoo requires it #define CLIENT_VERSION_MAJOR 0 #define CLIENT_VERSION_MINOR 9 -#define CLIENT_VERSION_REVISION 23 +#define CLIENT_VERSION_REVISION 24 #define CLIENT_VERSION_BUILD 0 // Set to true for release, false for prerelease or test build