implement refreshing dht storage every hour

This commit is contained in:
miguel 2013-08-30 20:25:23 -03:00
parent f01cca3d48
commit 1bae0176f5
6 changed files with 88 additions and 51 deletions

View File

@ -84,6 +84,8 @@ namespace libtorrent { namespace dht
void start(entry const& bootstrap); void start(entry const& bootstrap);
void stop(); void stop();
bool refresh_storage();
void add_node(udp::endpoint node); void add_node(udp::endpoint node);
void add_node(std::pair<std::string, int> const& node); void add_node(std::pair<std::string, int> const& node);
void add_router_node(udp::endpoint const& node); void add_router_node(udp::endpoint const& node);

View File

@ -179,7 +179,8 @@ public:
virtual ~node_impl() {} virtual ~node_impl() {}
void tick(); void tick();
void refresh(node_id const& id, find_data::nodes_callback const& f); bool refresh_storage();
void refresh(node_id const& id, find_data::nodes_callback const& f);
void bootstrap(std::vector<udp::endpoint> const& nodes void bootstrap(std::vector<udp::endpoint> const& nodes
, find_data::nodes_callback const& f); , find_data::nodes_callback const& f);
void add_router_node(udp::endpoint router); void add_router_node(udp::endpoint router);

View File

@ -125,7 +125,7 @@ void dht_get_observer::reply(msg const& m)
values_list.push_back(entry()); values_list.push_back(entry());
values_list.back() = *e; values_list.back() = *e;
} }
printf("dht_get::reply from %s:%d with %d entries\n", m.addr.address().to_string().c_str(), m.addr.port(), values_list.size()); //printf("dht_get::reply from %s:%d with %d entries\n", m.addr.address().to_string().c_str(), m.addr.port(), values_list.size());
static_cast<dht_get*>(m_algorithm.get())->got_data(values_list); static_cast<dht_get*>(m_algorithm.get())->got_data(values_list);
} }

View File

@ -283,6 +283,11 @@ namespace libtorrent { namespace dht
m_host_resolver.cancel(); m_host_resolver.cancel();
} }
bool dht_tracker::refresh_storage()
{
return m_dht.refresh_storage();
}
void dht_tracker::dht_status(session_status& s) void dht_tracker::dht_status(session_status& s)
{ {
m_dht.status(s); m_dht.status(s);

View File

@ -305,12 +305,9 @@ namespace
} }
} }
// [MF] FIXME: putData_fun must receive {p, sig_p} (no need to sign it several times)
void putData_fun(std::vector<std::pair<node_entry, std::string> > const& v, void putData_fun(std::vector<std::pair<node_entry, std::string> > const& v,
node_impl& node, node_impl& node,
std::string const &username, std::string const &resource, bool multi, entry const &p, std::string const &sig_p, std::string const &sig_user)
entry const &value, std::string const &sig_user,
boost::int64_t timeutc, int seq)
{ {
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(node) << "sending putData [ username: " << username TORRENT_LOG(node) << "sending putData [ username: " << username
@ -342,24 +339,7 @@ namespace
entry& a = e["x"]; entry& a = e["x"];
a["token"] = i->second; a["token"] = i->second;
entry& p = a["p"]; a["p"] = p;
entry& target = p["target"];
target["n"] = username;
target["r"] = resource;
target["t"] = (multi) ? "m" : "s";
if (seq >= 0 && !multi) p["seq"] = seq;
p["v"] = value;
p["time"] = timeutc;
p["height"] = getBestHeight()-1; // be conservative
std::vector<char> pbuf;
bencode(std::back_inserter(pbuf), p);
std::string sig_p = createSignature(std::string(pbuf.data(),pbuf.size()), sig_user);
if( !sig_p.size() ) {
printf("putData_fun: createSignature error (this should have been caught earlier)\n");
return;
}
a["sig_p"] = sig_p; a["sig_p"] = sig_p;
a["sig_user"] = sig_user; a["sig_user"] = sig_user;
@ -448,13 +428,31 @@ void node_impl::putData(std::string const &username, std::string const &resource
#endif #endif
printf("putData: username=%s,res=%s,multi=%d sig_user=%s\n", printf("putData: username=%s,res=%s,multi=%d sig_user=%s\n",
username.c_str(), resource.c_str(), multi, sig_user.c_str()); username.c_str(), resource.c_str(), multi, sig_user.c_str());
// construct p dictionary and sign it
entry p;
entry& target = p["target"];
target["n"] = username;
target["r"] = resource;
target["t"] = (multi) ? "m" : "s";
if (seq >= 0 && !multi) p["seq"] = seq;
p["v"] = value;
p["time"] = timeutc;
p["height"] = getBestHeight()-1; // be conservative
std::vector<char> pbuf;
bencode(std::back_inserter(pbuf), p);
std::string sig_p = createSignature(std::string(pbuf.data(),pbuf.size()), sig_user);
if( !sig_p.size() ) {
printf("putData: createSignature error (this should have been caught earlier)\n");
return;
}
// search for nodes with ids close to id or with peers // search for nodes with ids close to id or with peers
// for info-hash id. then send putData to them. // for info-hash id. then send putData to them.
boost::intrusive_ptr<dht_get> ta(new dht_get(*this, username, resource, multi, boost::intrusive_ptr<dht_get> ta(new dht_get(*this, username, resource, multi,
boost::bind(&nop), boost::bind(&nop),
boost::bind(&putData_fun, _1, boost::ref(*this), boost::bind(&putData_fun, _1, boost::ref(*this), p, sig_p, sig_user), true));
username, resource, multi,
value, sig_user, timeutc, seq), true));
ta->start(); ta->start();
} }
@ -480,38 +478,60 @@ void node_impl::tick()
refresh(target, boost::bind(&nop)); refresh(target, boost::bind(&nop));
ptime now = time_now(); ptime now = time_now();
if (now - m_last_storage_refresh > minutes(2)) { if (now - m_last_storage_refresh > minutes(60)) {
m_last_storage_refresh = now; m_last_storage_refresh = now;
refresh_storage();
}
}
printf("node dht: refreshing storage...\n"); bool node_impl::refresh_storage() {
bool did_something = false;
for (dht_storage_table_t::const_iterator i = m_storage_table.begin(), if( m_storage_table.size() == 0 )
end(m_storage_table.end()); i != end; ++i ) return did_something;
{
dht_storage_list_t const& lsto = i->second;
if( lsto.size() == 1 ) {
dht_storage_item const& item = lsto.front();
lazy_entry p; printf("node dht: refreshing storage...\n");
int pos;
error_code err;
// FIXME: optimize to avoid bdecode (store seq separated, etc)
int ret = lazy_bdecode(item.p.data(), item.p.data() + item.p.size(), p, err, &pos, 10, 500);
const lazy_entry *target = p.dict_find_dict("target"); for (dht_storage_table_t::const_iterator i = m_storage_table.begin(),
end(m_storage_table.end()); i != end; ++i )
{
dht_storage_list_t const& lsto = i->second;
if( lsto.size() == 1 ) {
dht_storage_item const& item = lsto.front();
// refresh only signed single posts lazy_entry p;
if( target->dict_find_string_value("t") == "s" ) { int pos;
printf("refresh dht storage: [%s,%s,%s]\n", error_code err;
target->dict_find_string_value("n").c_str(), // FIXME: optimize to avoid bdecode (store seq separated, etc)
target->dict_find_string_value("r").c_str(), int ret = lazy_bdecode(item.p.data(), item.p.data() + item.p.size(), p, err, &pos, 10, 500);
target->dict_find_string_value("t").c_str());
} const lazy_entry *target = p.dict_find_dict("target");
std::string username = target->dict_find_string_value("n");
std::string resource = target->dict_find_string_value("r");
bool multi = (target->dict_find_string_value("t") == "m");
// refresh only signed single posts
if( !multi ) {
printf("refresh dht storage: [%s,%s,%s]\n",
username.c_str(),
resource.c_str(),
target->dict_find_string_value("t").c_str());
entry entryP;
entryP = p; // lazy to non-lazy
// search for nodes with ids close to id or with peers
// for info-hash id. then send putData to them.
boost::intrusive_ptr<dht_get> ta(new dht_get(*this, username, resource, multi,
boost::bind(&nop),
boost::bind(&putData_fun, _1, boost::ref(*this),
entryP, item.sig_p, item.sig_user), true));
ta->start();
did_something = true;
} }
} }
} }
return did_something;
} }
time_duration node_impl::connection_timeout() time_duration node_impl::connection_timeout()
@ -1129,7 +1149,8 @@ void node_impl::incoming_request(msg const& m, entry& e)
if( msg_keys[mk_seq]->int_value() > p.dict_find_int("seq")->int_value() ) { if( msg_keys[mk_seq]->int_value() > p.dict_find_int("seq")->int_value() ) {
olditem = item; olditem = item;
} else { } else {
incoming_error(e, "old sequence number"); // don't report this error (because of refresh storage)
//incoming_error(e, "old sequence number");
return; return;
} }
} else { } else {

View File

@ -5707,6 +5707,14 @@ retry:
void session_impl::stop_dht() void session_impl::stop_dht()
{ {
if (!m_dht) return; if (!m_dht) return;
/* FIXME: good idea... unfortunately it doesnt work.
if( m_dht->refresh_storage() ) {
// a small chance to refresh to suceed
sleep(5000);
}
*/
m_udp_socket.unsubscribe(m_dht.get()); m_udp_socket.unsubscribe(m_dht.get());
m_dht->stop(); m_dht->stop();
m_dht = 0; m_dht = 0;