Browse Source

global dht traffic seems too high: implement probabilistic refresh for users that post too much.

this should reduce the traffic for now but me may want to tweak these settings later.
the most recent posts are priorized regardless of how much the user posts.
miguelfreitas
Miguel Freitas 11 years ago
parent
commit
3b0633fad4
  1. 5
      libtorrent/include/libtorrent/kademlia/node.hpp
  2. 102
      libtorrent/src/kademlia/node.cpp
  3. 2
      src/clientversion.h

5
libtorrent/include/libtorrent/kademlia/node.hpp

@ -177,6 +177,7 @@ class TORRENT_EXTRA_EXPORT node_impl : boost::noncopyable
typedef std::map<node_id, torrent_entry> table_t; typedef std::map<node_id, torrent_entry> table_t;
typedef std::list<dht_storage_item> dht_storage_list_t; typedef std::list<dht_storage_item> dht_storage_list_t;
typedef std::map<node_id, dht_storage_list_t> dht_storage_table_t; typedef std::map<node_id, dht_storage_list_t> dht_storage_table_t;
typedef std::map< std::string, std::pair<int,int> > dht_posts_by_user_t; // total known, latest known
public: public:
node_impl(alert_dispatcher* alert_disp, udp_socket_interface* sock node_impl(alert_dispatcher* alert_disp, udp_socket_interface* sock
@ -287,8 +288,9 @@ private:
std::set<traversal_algorithm*> m_running_requests; std::set<traversal_algorithm*> m_running_requests;
void incoming_request(msg const& h, entry& e); void incoming_request(msg const& h, entry& e);
void store_dht_item(dht_storage_item &item, big_number const &target, bool store_dht_item(dht_storage_item &item, big_number const &target,
bool multi, int seq, int height, std::pair<char const*, int> &bufv); bool multi, int seq, int height, std::pair<char const*, int> &bufv);
void process_newly_stored_entry(const lazy_entry &p);
node_id m_id; node_id m_id;
@ -299,6 +301,7 @@ public:
private: private:
table_t m_map; table_t m_map;
dht_storage_table_t m_storage_table; dht_storage_table_t m_storage_table;
dht_posts_by_user_t m_posts_by_user;
ptime m_last_tracker_tick; ptime m_last_tracker_tick;
ptime m_next_storage_refresh; ptime m_next_storage_refresh;

102
libtorrent/src/kademlia/node.cpp

@ -59,7 +59,6 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/rsa.hpp" #include "libtorrent/rsa.hpp"
#include "../../src/twister.h" #include "../../src/twister.h"
#define ENABLE_DHT_ITEM_EXPIRE
/* refresh dht itens we know by putting them to other peers every 60 minutes. /* refresh dht itens we know by putting them to other peers every 60 minutes.
* this period must be small enough to ensure persistency and big enough to * this period must be small enough to ensure persistency and big enough to
@ -122,6 +121,8 @@ node_impl::node_impl(alert_dispatcher* alert_disp
, m_id(nid == (node_id::min)() || !verify_id(nid, external_address) ? generate_id(external_address) : nid) , m_id(nid == (node_id::min)() || !verify_id(nid, external_address) ? generate_id(external_address) : nid)
, m_table(m_id, 8, settings) , m_table(m_id, 8, settings)
, m_rpc(m_id, m_table, sock, observer) , m_rpc(m_id, m_table, sock, observer)
, m_storage_table()
, m_posts_by_user()
, m_last_tracker_tick(time_now()) , m_last_tracker_tick(time_now())
, m_next_storage_refresh(time_now()) , m_next_storage_refresh(time_now())
, m_post_alert(alert_disp) , m_post_alert(alert_disp)
@ -517,7 +518,11 @@ void node_impl::putDataSigned(std::string const &username, std::string const &re
int seq = (seqEntry && seqEntry->type() == entry::int_t) ? seqEntry->integer() : -1; int seq = (seqEntry && seqEntry->type() == entry::int_t) ? seqEntry->integer() : -1;
int height = heightEntry->integer(); int height = heightEntry->integer();
store_dht_item(item, ta->target(), multi, seq, height, bufv); if( store_dht_item(item, ta->target(), multi, seq, height, bufv) ) {
// local items not yet processed for hashtags and post counts
// not that bad - but we may eventually want to implement this
//process_newly_stored_entry(p);
}
} }
// now send it to the network (start transversal algorithm) // now send it to the network (start transversal algorithm)
@ -555,11 +560,17 @@ void node_impl::tick()
} }
} }
static void processEntryForHashtags(lazy_entry &p) void node_impl::process_newly_stored_entry(const lazy_entry &p)
{ {
const lazy_entry *target = p.dict_find_dict("target"); const lazy_entry *target = p.dict_find_dict("target");
bool multi = (target && target->dict_find_string_value("t") == "m"); if( !target )
return;
std::string username = target->dict_find_string_value("n");
std::string resource = target->dict_find_string_value("r");
bool multi = (target->dict_find_string_value("t") == "m");
// update hashtags stats
const lazy_entry *v = p.dict_find_dict("v"); const lazy_entry *v = p.dict_find_dict("v");
if( v && !multi ) { if( v && !multi ) {
const lazy_entry *userpost = v->dict_find_dict("userpost"); const lazy_entry *userpost = v->dict_find_dict("userpost");
@ -577,6 +588,15 @@ static void processEntryForHashtags(lazy_entry &p)
} }
} }
} }
// update posts stats
std::string resourcePost("post");
if( resource.compare(0, resourcePost.length(), resourcePost) == 0 ) {
int resourceNumber = atoi( resource.c_str() + resourcePost.length() );
std::pair<int,int> &userStats = m_posts_by_user[username];
userStats.first++; //total
userStats.second = std::max(userStats.second, resourceNumber);
}
} }
bool node_impl::refresh_storage() { bool node_impl::refresh_storage() {
@ -593,19 +613,22 @@ bool node_impl::refresh_storage() {
for(int jIdx = 0; j != jEnd; ++j, ++jIdx ) { for(int jIdx = 0; j != jEnd; ++j, ++jIdx ) {
dht_storage_item& item = *j; dht_storage_item& item = *j;
#ifdef ENABLE_DHT_ITEM_EXPIRE
if( has_expired(item, true) ) { if( has_expired(item, true) ) {
continue; continue;
} }
#endif
if( item.next_refresh_time > now ) { if( item.next_refresh_time > now ) {
// this item won't be refreshed this time,
// but it may shorten sleep time to next refresh
if( m_next_storage_refresh > item.next_refresh_time ) { if( m_next_storage_refresh > item.next_refresh_time ) {
m_next_storage_refresh = item.next_refresh_time; m_next_storage_refresh = item.next_refresh_time;
} }
continue; continue;
} }
bool skip = false;
bool local_and_recent = (item.local_add_time && item.local_add_time + 60*60*24*2 > time(NULL));
lazy_entry p; lazy_entry p;
int pos; int pos;
error_code err; error_code err;
@ -614,27 +637,46 @@ bool node_impl::refresh_storage() {
int height = p.dict_find_int_value("height"); int height = p.dict_find_int_value("height");
if( height > getBestHeight() ) { if( height > getBestHeight() ) {
continue; // how? skip = true; // invalid? how come?
} }
const lazy_entry *target = p.dict_find_dict("target"); const lazy_entry *target = p.dict_find_dict("target");
if( !target )
continue;
std::string username = target->dict_find_string_value("n"); std::string username = target->dict_find_string_value("n");
std::string resource = target->dict_find_string_value("r"); std::string resource = target->dict_find_string_value("r");
bool multi = (target->dict_find_string_value("t") == "m"); bool multi = (target->dict_find_string_value("t") == "m");
// refresh only signed single posts and mentions // probabilistic refresh for users that post a lot.
if( !multi || (multi && resource == "mention") || // note: we don't know the true total number of posts by user, but
(multi && item.local_add_time && item.local_add_time + 60*60*24*2 > time(NULL)) ) { // rather just what we have stored and still not expired.
std::string resourcePost("post");
if( resource.compare(0, resourcePost.length(), resourcePost) == 0 ) {
int resourceNumber = atoi( resource.c_str() + resourcePost.length() );
std::pair<int,int> &userStats = m_posts_by_user[username];
int knownPosts = userStats.first;
int lastPost = userStats.second;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
printf("node dht: probabilistic post refresh for user: %s (total: %d last: %d cur: %d)\n",
username.c_str(), knownPosts, lastPost, resourceNumber);
#endif
if( resourceNumber < lastPost - 100 && knownPosts > 25 ) {
double p = 25. / (knownPosts - 25);
if( getRandom() > p ) {
skip = true;
}
}
}
// refresh only signed single posts and mentions
if( !skip &&
(!multi || resource == "mention" || local_and_recent) ) {
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
printf("node dht: refreshing storage: [%s,%s,%s]\n", printf("node dht: refreshing storage: [%s,%s,%s]\n",
username.c_str(), username.c_str(),
resource.c_str(), resource.c_str(),
target->dict_find_string_value("t").c_str()); target->dict_find_string_value("t").c_str());
#endif #endif
processEntryForHashtags(p);
entry entryP; entry entryP;
entryP = p; // lazy to non-lazy entryP = p; // lazy to non-lazy
@ -646,21 +688,20 @@ bool node_impl::refresh_storage() {
entryP, item.sig_p, item.sig_user), item.confirmed)); entryP, item.sig_p, item.sig_user), item.confirmed));
ta->start(); ta->start();
did_something = true; did_something = true;
item.next_refresh_time = getNextRefreshTime(item.confirmed);
} }
// we are supposed to have refreshed this item by now (but we may have not - see above)
// so regardless of what we actually did, calculate when the next refresh is due
item.next_refresh_time = getNextRefreshTime(item.confirmed);
if( m_next_storage_refresh > item.next_refresh_time ) { if( m_next_storage_refresh > item.next_refresh_time ) {
m_next_storage_refresh = item.next_refresh_time; m_next_storage_refresh = item.next_refresh_time;
} }
} }
} }
/* /*
printf("node dht: next storage refresh in %s\n", printf("node dht: next storage refresh in %d\n",
boost::posix_time::to_simple_string(sleepToRefresh).c_str() ); m_next_storage_refresh - now );
*/ */
return did_something; return did_something;
} }
@ -781,26 +822,21 @@ void node_impl::load_storage(entry const* e) {
item.confirmed = (confirmed->integer() != 0); item.confirmed = (confirmed->integer() != 0);
} }
// just for printf for now
bool expired = has_expired(item); bool expired = has_expired(item);
#ifdef ENABLE_DHT_ITEM_EXPIRE
if( !expired ) { if( !expired ) {
#endif
lazy_entry p; lazy_entry p;
int pos; int pos;
error_code err; error_code err;
// FIXME: optimize to avoid bdecode (store seq separated, etc) // FIXME: optimize to avoid bdecode (store seq separated, etc)
int ret = lazy_bdecode(item.p.data(), item.p.data() + item.p.size(), p, err, &pos, 10, 500); int ret = lazy_bdecode(item.p.data(), item.p.data() + item.p.size(), p, err, &pos, 10, 500);
processEntryForHashtags(p); process_newly_stored_entry(p);
// wait 1 minute (to load torrents, etc.) // wait 1 minute (to load torrents, etc.)
// randomize refresh time // randomize refresh time
item.next_refresh_time = now + minutes(1) + refresh_interval * getRandom(); item.next_refresh_time = now + minutes(1) + refresh_interval * getRandom();
to_add.push_back(item); to_add.push_back(item);
#ifdef ENABLE_DHT_ITEM_EXPIRE
} }
#endif
} }
m_storage_table.insert(std::make_pair(target, to_add)); m_storage_table.insert(std::make_pair(target, to_add));
} }
@ -1409,8 +1445,10 @@ void node_impl::incoming_request(msg const& m, entry& e)
dht_storage_item item(str_p, msg_keys[mk_sig_p], msg_keys[mk_sig_user]); dht_storage_item item(str_p, msg_keys[mk_sig_p], msg_keys[mk_sig_user]);
std::pair<char const*, int> bufv = msg_keys[mk_v]->data_section(); std::pair<char const*, int> bufv = msg_keys[mk_v]->data_section();
store_dht_item(item, target, multi, !multi ? msg_keys[mk_seq]->int_value() : 0, if( store_dht_item(item, target, multi, !multi ? msg_keys[mk_seq]->int_value() : 0,
msg_keys[mk_height]->int_value(), bufv); msg_keys[mk_height]->int_value(), bufv) ) {
process_newly_stored_entry(*msg_keys[mk_p]);
}
} }
else if (strcmp(query, "getData") == 0) else if (strcmp(query, "getData") == 0)
{ {
@ -1536,9 +1574,11 @@ void node_impl::incoming_request(msg const& m, entry& e)
} }
} }
void node_impl::store_dht_item(dht_storage_item &item, const big_number &target, bool node_impl::store_dht_item(dht_storage_item &item, const big_number &target,
bool multi, int seq, int height, std::pair<char const*, int> &bufv) bool multi, int seq, int height, std::pair<char const*, int> &bufv)
{ {
bool stored = false;
item.next_refresh_time = getNextRefreshTime(item.confirmed); item.next_refresh_time = getNextRefreshTime(item.confirmed);
if( m_next_storage_refresh > item.next_refresh_time ) { if( m_next_storage_refresh > item.next_refresh_time ) {
m_next_storage_refresh = item.next_refresh_time; m_next_storage_refresh = item.next_refresh_time;
@ -1557,6 +1597,7 @@ void node_impl::store_dht_item(dht_storage_item &item, const big_number &target,
boost::tie(i, boost::tuples::ignore) = m_storage_table.insert( boost::tie(i, boost::tuples::ignore) = m_storage_table.insert(
std::make_pair(target, to_add)); std::make_pair(target, to_add));
stored = true;
} else { } else {
dht_storage_list_t & lsto = i->second; dht_storage_list_t & lsto = i->second;
@ -1574,10 +1615,11 @@ void node_impl::store_dht_item(dht_storage_item &item, const big_number &target,
if( !multi ) { if( !multi ) {
if( seq > p.dict_find_int("seq")->int_value() ) { if( seq > p.dict_find_int("seq")->int_value() ) {
olditem = item; olditem = item;
stored = true;
} else { } else {
// don't report this error (because of refresh storage) // don't report this error (because of refresh storage)
//incoming_error(e, "old sequence number"); //incoming_error(e, "old sequence number");
return; break;
} }
} else { } else {
// compare contents before adding to the list // compare contents before adding to the list
@ -1597,12 +1639,14 @@ void node_impl::store_dht_item(dht_storage_item &item, const big_number &target,
if(multi && j == rend) { if(multi && j == rend) {
// new entry // new entry
lsto.insert(insert_pos, item); lsto.insert(insert_pos, item);
stored = true;
} }
if(lsto.size() > m_settings.max_entries_per_multi) { if(lsto.size() > m_settings.max_entries_per_multi) {
lsto.resize(m_settings.max_entries_per_multi); lsto.resize(m_settings.max_entries_per_multi);
} }
} }
return stored;
} }
} } // namespace libtorrent::dht } } // namespace libtorrent::dht

2
src/clientversion.h

@ -8,7 +8,7 @@
// These need to be macros, as version.cpp's and bitcoin-qt.rc's voodoo requires it // These need to be macros, as version.cpp's and bitcoin-qt.rc's voodoo requires it
#define CLIENT_VERSION_MAJOR 0 #define CLIENT_VERSION_MAJOR 0
#define CLIENT_VERSION_MINOR 9 #define CLIENT_VERSION_MINOR 9
#define CLIENT_VERSION_REVISION 23 #define CLIENT_VERSION_REVISION 24
#define CLIENT_VERSION_BUILD 0 #define CLIENT_VERSION_BUILD 0
// Set to true for release, false for prerelease or test build // Set to true for release, false for prerelease or test build

Loading…
Cancel
Save