mirror of
https://github.com/twisterarmy/twister-core.git
synced 2025-02-02 09:54:29 +00:00
suggestion for DHT refresh
This commit is contained in:
parent
6c4c656be4
commit
8429d95dba
@ -52,6 +52,9 @@ POSSIBILITY OF SUCH DAMAGE.
|
|||||||
|
|
||||||
#include <boost/cstdint.hpp>
|
#include <boost/cstdint.hpp>
|
||||||
#include <boost/ref.hpp>
|
#include <boost/ref.hpp>
|
||||||
|
#include <boost/random.hpp>
|
||||||
|
#include <boost/nondet_random.hpp>
|
||||||
|
#include <boost/random/mersenne_twister.hpp>
|
||||||
|
|
||||||
#include "libtorrent/socket.hpp"
|
#include "libtorrent/socket.hpp"
|
||||||
|
|
||||||
@ -118,18 +121,20 @@ struct torrent_entry
|
|||||||
struct dht_storage_item
|
struct dht_storage_item
|
||||||
{
|
{
|
||||||
// FIXME: optimize so bdecode is not needed all the time
|
// FIXME: optimize so bdecode is not needed all the time
|
||||||
dht_storage_item() : p(), sig_p(), sig_user(), local_add_time(0) {}
|
dht_storage_item() : p(), sig_p(), sig_user(), local_add_time(0), confirmed(false), next_refresh_time(0) {}
|
||||||
dht_storage_item(std::string const &_p, lazy_entry const *_sig_p, lazy_entry const *_sig_user)
|
dht_storage_item(std::string const &_p, lazy_entry const *_sig_p, lazy_entry const *_sig_user)
|
||||||
: p(_p), sig_p(_sig_p->string_value()), sig_user(_sig_user->string_value()),
|
: p(_p), sig_p(_sig_p->string_value()), sig_user(_sig_user->string_value()),
|
||||||
local_add_time(0) {}
|
local_add_time(0), confirmed(false), next_refresh_time(0) {}
|
||||||
dht_storage_item(std::string const &_p, std::string const &_sig_p, std::string const &_sig_user)
|
dht_storage_item(std::string const &_p, std::string const &_sig_p, std::string const &_sig_user)
|
||||||
: p(_p), sig_p(_sig_p), sig_user(_sig_user), local_add_time(0) {}
|
: p(_p), sig_p(_sig_p), sig_user(_sig_user), local_add_time(0), confirmed(false), next_refresh_time(0) {}
|
||||||
std::string p;
|
std::string p;
|
||||||
std::string sig_p;
|
std::string sig_p;
|
||||||
std::string sig_user;
|
std::string sig_user;
|
||||||
boost::int64_t local_add_time;
|
boost::int64_t local_add_time;
|
||||||
// the last time we heard about this
|
// the last time we heard about this
|
||||||
//ptime last_seen;
|
//ptime last_seen;
|
||||||
|
bool confirmed;
|
||||||
|
ptime next_refresh_time;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
@ -300,14 +305,16 @@ private:
|
|||||||
|
|
||||||
ptime m_last_tracker_tick;
|
ptime m_last_tracker_tick;
|
||||||
ptime m_next_storage_refresh;
|
ptime m_next_storage_refresh;
|
||||||
int m_refresh_per_tick;
|
|
||||||
std::pair<node_id, int> m_last_refreshed_item;
|
|
||||||
|
|
||||||
// secret random numbers used to create write tokens
|
// secret random numbers used to create write tokens
|
||||||
int m_secret[2];
|
int m_secret[2];
|
||||||
|
|
||||||
alert_dispatcher* m_post_alert;
|
alert_dispatcher* m_post_alert;
|
||||||
udp_socket_interface* m_sock;
|
udp_socket_interface* m_sock;
|
||||||
|
|
||||||
|
boost::mt19937 m_random_seed;
|
||||||
|
boost::uniform_real<> m_random_dist;
|
||||||
|
boost::variate_generator<boost::mt19937&, boost::uniform_real<> > m_random;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@ -107,9 +107,10 @@ node_impl::node_impl(alert_dispatcher* alert_disp
|
|||||||
, m_rpc(m_id, m_table, sock, observer)
|
, m_rpc(m_id, m_table, sock, observer)
|
||||||
, m_last_tracker_tick(time_now())
|
, m_last_tracker_tick(time_now())
|
||||||
, m_next_storage_refresh(time_now())
|
, m_next_storage_refresh(time_now())
|
||||||
, m_last_refreshed_item()
|
|
||||||
, m_post_alert(alert_disp)
|
, m_post_alert(alert_disp)
|
||||||
, m_sock(sock)
|
, m_sock(sock)
|
||||||
|
, m_random_dist(0.0, 1.0)
|
||||||
|
, m_random(m_random_seed, m_random_dist)
|
||||||
{
|
{
|
||||||
m_secret[0] = random();
|
m_secret[0] = random();
|
||||||
m_secret[1] = std::rand();
|
m_secret[1] = std::rand();
|
||||||
@ -308,6 +309,11 @@ namespace
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void putData_confirm(dht_storage_item& item)
|
||||||
|
{
|
||||||
|
item.confirmed = true;
|
||||||
|
}
|
||||||
|
|
||||||
void putData_fun(std::vector<std::pair<node_entry, std::string> > const& v,
|
void putData_fun(std::vector<std::pair<node_entry, std::string> > const& v,
|
||||||
node_impl& node,
|
node_impl& node,
|
||||||
entry const &p, std::string const &sig_p, std::string const &sig_user)
|
entry const &p, std::string const &sig_p, std::string const &sig_user)
|
||||||
@ -535,22 +541,17 @@ static void processEntryForHashtags(lazy_entry &p)
|
|||||||
|
|
||||||
bool node_impl::refresh_storage() {
|
bool node_impl::refresh_storage() {
|
||||||
bool did_something = false;
|
bool did_something = false;
|
||||||
bool refresh_next_item = false;
|
|
||||||
int num_refreshable = 0;
|
|
||||||
|
|
||||||
for (dht_storage_table_t::const_iterator i = m_storage_table.begin(),
|
ptime const now = time_now();
|
||||||
|
m_next_storage_refresh = now + minutes(10);
|
||||||
|
|
||||||
|
for (dht_storage_table_t::iterator i = m_storage_table.begin(),
|
||||||
end(m_storage_table.end()); i != end; ++i )
|
end(m_storage_table.end()); i != end; ++i )
|
||||||
{
|
{
|
||||||
dht_storage_list_t const& lsto = i->second;
|
dht_storage_list_t& lsto = i->second;
|
||||||
dht_storage_list_t::const_iterator j(lsto.begin()), jEnd(lsto.end());
|
dht_storage_list_t::iterator j(lsto.begin()), jEnd(lsto.end());
|
||||||
for(int jIdx = 0; j != jEnd; ++j, ++jIdx ) {
|
for(int jIdx = 0; j != jEnd; ++j, ++jIdx ) {
|
||||||
dht_storage_item const& item = *j;
|
dht_storage_item& item = *j;
|
||||||
|
|
||||||
if( std::make_pair(i->first,jIdx) == m_last_refreshed_item ) {
|
|
||||||
refresh_next_item = true;
|
|
||||||
num_refreshable++;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
#ifdef ENABLE_DHT_ITEM_EXPIRE
|
#ifdef ENABLE_DHT_ITEM_EXPIRE
|
||||||
if( has_expired(item, true) ) {
|
if( has_expired(item, true) ) {
|
||||||
@ -558,6 +559,13 @@ bool node_impl::refresh_storage() {
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
if( item.next_refresh_time > now ) {
|
||||||
|
if( m_next_storage_refresh > item.next_refresh_time ) {
|
||||||
|
m_next_storage_refresh = item.next_refresh_time;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
lazy_entry p;
|
lazy_entry p;
|
||||||
int pos;
|
int pos;
|
||||||
error_code err;
|
error_code err;
|
||||||
@ -577,11 +585,7 @@ bool node_impl::refresh_storage() {
|
|||||||
// refresh only signed single posts and mentions
|
// refresh only signed single posts and mentions
|
||||||
if( !multi || (multi && resource == "mention") ||
|
if( !multi || (multi && resource == "mention") ||
|
||||||
(multi && item.local_add_time && item.local_add_time + 60*60*24*2 > time(NULL)) ) {
|
(multi && item.local_add_time && item.local_add_time + 60*60*24*2 > time(NULL)) ) {
|
||||||
num_refreshable++;
|
|
||||||
|
|
||||||
if( refresh_next_item && m_refresh_per_tick ) {
|
|
||||||
--m_refresh_per_tick;
|
|
||||||
m_last_refreshed_item = std::make_pair(i->first,jIdx);
|
|
||||||
#ifdef TORRENT_DHT_VERBOSE_LOGGING
|
#ifdef TORRENT_DHT_VERBOSE_LOGGING
|
||||||
printf("node dht: refreshing storage: [%s,%s,%s]\n",
|
printf("node dht: refreshing storage: [%s,%s,%s]\n",
|
||||||
username.c_str(),
|
username.c_str(),
|
||||||
@ -589,39 +593,29 @@ bool node_impl::refresh_storage() {
|
|||||||
target->dict_find_string_value("t").c_str());
|
target->dict_find_string_value("t").c_str());
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
processEntryForHashtags(p);
|
processEntryForHashtags(p);
|
||||||
|
|
||||||
entry entryP;
|
entry entryP;
|
||||||
entryP = p; // lazy to non-lazy
|
entryP = p; // lazy to non-lazy
|
||||||
|
|
||||||
// search for nodes with ids close to id or with peers
|
// search for nodes with ids close to id or with peers
|
||||||
// for info-hash id. then send putData to them.
|
// for info-hash id. then send putData to them.
|
||||||
boost::intrusive_ptr<dht_get> ta(new dht_get(*this, username, resource, multi,
|
boost::intrusive_ptr<dht_get> ta(new dht_get(*this, username, resource, multi,
|
||||||
boost::bind(&nop),
|
boost::bind(&putData_confirm, item),
|
||||||
boost::bind(&putData_fun, _1, boost::ref(*this),
|
boost::bind(&putData_fun, _1, boost::ref(*this),
|
||||||
entryP, item.sig_p, item.sig_user), true));
|
entryP, item.sig_p, item.sig_user), !item.confirmed));
|
||||||
ta->start();
|
ta->start();
|
||||||
did_something = true;
|
did_something = true;
|
||||||
|
|
||||||
|
// add 10% diffusion to next refresh time
|
||||||
|
item.next_refresh_time = now + minutes(item.confirmed ? 60 : 1) * (1. + 0.1 * (2. * m_random() - 1.));
|
||||||
|
if( m_next_storage_refresh > item.next_refresh_time ) {
|
||||||
|
m_next_storage_refresh = item.next_refresh_time;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if( !did_something && m_storage_table.size() ) {
|
|
||||||
m_last_refreshed_item = std::make_pair(m_storage_table.begin()->first,0);
|
|
||||||
}
|
|
||||||
|
|
||||||
const time_duration tickInterval = seconds(5);
|
|
||||||
const time_duration fullRefreshInterval = minutes(30);
|
|
||||||
const time_duration sleepInterval = minutes(10);
|
|
||||||
const time_duration sleepToRefresh = std::min( sleepInterval, fullRefreshInterval / (num_refreshable ? num_refreshable : 1) );
|
|
||||||
m_next_storage_refresh = time_now() + sleepToRefresh;
|
|
||||||
if( sleepToRefresh > tickInterval ) {
|
|
||||||
m_refresh_per_tick = 1;
|
|
||||||
} else {
|
|
||||||
m_refresh_per_tick = tickInterval.diff/sleepToRefresh.diff;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
printf("node dht: next storage refresh in %s\n",
|
printf("node dht: next storage refresh in %s\n",
|
||||||
boost::posix_time::to_simple_string(sleepToRefresh).c_str() );
|
boost::posix_time::to_simple_string(sleepToRefresh).c_str() );
|
||||||
@ -704,6 +698,8 @@ bool node_impl::save_storage(entry &save) const {
|
|||||||
entry_item["sig_user"] = item.sig_user;
|
entry_item["sig_user"] = item.sig_user;
|
||||||
if( item.local_add_time )
|
if( item.local_add_time )
|
||||||
entry_item["local_add_time"] = item.local_add_time;
|
entry_item["local_add_time"] = item.local_add_time;
|
||||||
|
if( item.confirmed )
|
||||||
|
entry_item["confirmed"] = item.confirmed ? 0 : 1;
|
||||||
save_list.list().push_back(entry_item);
|
save_list.list().push_back(entry_item);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -720,6 +716,8 @@ void node_impl::load_storage(entry const* e) {
|
|||||||
if( !e || e->type() != entry::dictionary_t)
|
if( !e || e->type() != entry::dictionary_t)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
ptime now = time_now();
|
||||||
|
|
||||||
printf("node dht: loading storage... (%lu node_id keys)\n", e->dict().size());
|
printf("node dht: loading storage... (%lu node_id keys)\n", e->dict().size());
|
||||||
|
|
||||||
for (entry::dictionary_type::const_iterator i = e->dict().begin();
|
for (entry::dictionary_type::const_iterator i = e->dict().begin();
|
||||||
@ -738,6 +736,13 @@ void node_impl::load_storage(entry const* e) {
|
|||||||
entry const *local_add_time( j->find_key("local_add_time") );
|
entry const *local_add_time( j->find_key("local_add_time") );
|
||||||
if(local_add_time)
|
if(local_add_time)
|
||||||
item.local_add_time = local_add_time->integer();
|
item.local_add_time = local_add_time->integer();
|
||||||
|
entry const *confirmed( j->find_key("confirmed") );
|
||||||
|
if(confirmed) {
|
||||||
|
item.confirmed = (bool) confirmed->integer();
|
||||||
|
} else {
|
||||||
|
// compatibility with twister 0.9.22 and below
|
||||||
|
item.confirmed = true;
|
||||||
|
}
|
||||||
|
|
||||||
// just for printf for now
|
// just for printf for now
|
||||||
bool expired = has_expired(item);
|
bool expired = has_expired(item);
|
||||||
@ -751,6 +756,9 @@ void node_impl::load_storage(entry const* e) {
|
|||||||
int ret = lazy_bdecode(item.p.data(), item.p.data() + item.p.size(), p, err, &pos, 10, 500);
|
int ret = lazy_bdecode(item.p.data(), item.p.data() + item.p.size(), p, err, &pos, 10, 500);
|
||||||
processEntryForHashtags(p);
|
processEntryForHashtags(p);
|
||||||
|
|
||||||
|
// randomize refresh time
|
||||||
|
item.next_refresh_time = now + minutes(item.confirmed ? 60 : 1) * m_random();
|
||||||
|
|
||||||
to_add.push_back(item);
|
to_add.push_back(item);
|
||||||
#ifdef ENABLE_DHT_ITEM_EXPIRE
|
#ifdef ENABLE_DHT_ITEM_EXPIRE
|
||||||
}
|
}
|
||||||
@ -1493,6 +1501,7 @@ void node_impl::incoming_request(msg const& m, entry& e)
|
|||||||
void node_impl::store_dht_item(dht_storage_item &item, const big_number &target,
|
void node_impl::store_dht_item(dht_storage_item &item, const big_number &target,
|
||||||
bool multi, int seq, int height, std::pair<char const*, int> &bufv)
|
bool multi, int seq, int height, std::pair<char const*, int> &bufv)
|
||||||
{
|
{
|
||||||
|
item.next_refresh_time = time_now() + minutes(item.confirmed ? 60 : 1);
|
||||||
dht_storage_table_t::iterator i = m_storage_table.find(target);
|
dht_storage_table_t::iterator i = m_storage_table.find(target);
|
||||||
if (i == m_storage_table.end()) {
|
if (i == m_storage_table.end()) {
|
||||||
// make sure we don't add too many items
|
// make sure we don't add too many items
|
||||||
|
Loading…
x
Reference in New Issue
Block a user