Browse Source

store dhtput data locally so it will be refreshed like the other dht entries we maintain.

should help with missing DHT entries (issue #165), although i think #165 would benefit of a more agressive (faster) retrying of remote stores.
miguelfreitas
Miguel Freitas 11 years ago
parent
commit
ad83ba061c
  1. 6
      libtorrent/include/libtorrent/kademlia/node.hpp
  2. 142
      libtorrent/src/kademlia/node.cpp
  3. 2
      src/clientversion.h

6
libtorrent/include/libtorrent/kademlia/node.hpp

@ -119,8 +119,10 @@ struct dht_storage_item
{ {
// FIXME: optimize so bdecode is not needed all the time // FIXME: optimize so bdecode is not needed all the time
dht_storage_item() : p(), sig_p(), sig_user() {} dht_storage_item() : p(), sig_p(), sig_user() {}
dht_storage_item(std::string &_p, lazy_entry const *_sig_p, lazy_entry const *_sig_user) dht_storage_item(std::string const &_p, lazy_entry const *_sig_p, lazy_entry const *_sig_user)
: p(_p), sig_p(_sig_p->string_value()), sig_user(_sig_user->string_value()) {} : p(_p), sig_p(_sig_p->string_value()), sig_user(_sig_user->string_value()) {}
dht_storage_item(std::string const &_p, std::string const &_sig_p, std::string const &_sig_user)
: p(_p), sig_p(_sig_p), sig_user(_sig_user) {}
std::string p; std::string p;
std::string sig_p; std::string sig_p;
std::string sig_user; std::string sig_user;
@ -282,6 +284,8 @@ private:
std::set<traversal_algorithm*> m_running_requests; std::set<traversal_algorithm*> m_running_requests;
void incoming_request(msg const& h, entry& e); void incoming_request(msg const& h, entry& e);
void store_dht_item(dht_storage_item &item, big_number const &target,
bool multi, int seq, int height, std::pair<char const*, int> &bufv);
node_id m_id; node_id m_id;

142
libtorrent/src/kademlia/node.cpp

@ -444,11 +444,13 @@ void node_impl::putData(std::string const &username, std::string const &resource
if (seq >= 0 && !multi) p["seq"] = seq; if (seq >= 0 && !multi) p["seq"] = seq;
p["v"] = value; p["v"] = value;
p["time"] = timeutc; p["time"] = timeutc;
p["height"] = getBestHeight()-1; // be conservative int height = getBestHeight()-1; // be conservative
p["height"] = height;
std::vector<char> pbuf; std::vector<char> pbuf;
bencode(std::back_inserter(pbuf), p); bencode(std::back_inserter(pbuf), p);
std::string sig_p = createSignature(std::string(pbuf.data(),pbuf.size()), sig_user); std::string str_p = std::string(pbuf.data(),pbuf.size());
std::string sig_p = createSignature(str_p, sig_user);
if( !sig_p.size() ) { if( !sig_p.size() ) {
printf("putData: createSignature error (this should have been caught earlier)\n"); printf("putData: createSignature error (this should have been caught earlier)\n");
return; return;
@ -459,6 +461,15 @@ void node_impl::putData(std::string const &username, std::string const &resource
boost::intrusive_ptr<dht_get> ta(new dht_get(*this, username, resource, multi, boost::intrusive_ptr<dht_get> ta(new dht_get(*this, username, resource, multi,
boost::bind(&nop), boost::bind(&nop),
boost::bind(&putData_fun, _1, boost::ref(*this), p, sig_p, sig_user), true)); boost::bind(&putData_fun, _1, boost::ref(*this), p, sig_p, sig_user), true));
// store it locally so it will be automatically refreshed with the rest
dht_storage_item item(str_p, sig_p, sig_user);
std::vector<char> vbuf;
bencode(std::back_inserter(vbuf), value);
std::pair<char const*, int> bufv = std::make_pair(vbuf.data(), vbuf.size());
store_dht_item(item, ta->target(), multi, seq, height, bufv);
// now send it to the network (start transversal algorithm)
ta->start(); ta->start();
} }
@ -1327,71 +1338,13 @@ void node_impl::incoming_request(msg const& m, entry& e)
// attack this resource by storing value into non-final nodes. // attack this resource by storing value into non-final nodes.
if( !possiblyNeighbor ) { if( !possiblyNeighbor ) {
printf("putData with possiblyNeighbor=false, ignoring request.\n"); printf("putData with possiblyNeighbor=false, ignoring request.\n");
return;
} }
dht_storage_item item(str_p, msg_keys[mk_sig_p], msg_keys[mk_sig_user]); dht_storage_item item(str_p, msg_keys[mk_sig_p], msg_keys[mk_sig_user]);
dht_storage_table_t::iterator i = m_storage_table.find(target);
if (i == m_storage_table.end()) {
// make sure we don't add too many items
if (int(m_storage_table.size()) >= m_settings.max_dht_items)
{
// FIXME: erase one? preferably a multi
}
dht_storage_list_t to_add;
to_add.push_back(item);
boost::tie(i, boost::tuples::ignore) = m_storage_table.insert(
std::make_pair(target, to_add));
} else {
dht_storage_list_t & lsto = i->second;
dht_storage_list_t::reverse_iterator j, rend(lsto.rend());
dht_storage_list_t::iterator insert_pos = lsto.end();
for( j = lsto.rbegin(); j != rend; ++j) {
dht_storage_item &olditem = *j;
lazy_entry p;
int pos;
error_code err;
// FIXME: optimize to avoid bdecode (store seq separated, etc)
int ret = lazy_bdecode(olditem.p.data(), olditem.p.data() + olditem.p.size(), p, err, &pos, 10, 500);
if( !multi ) {
if( msg_keys[mk_seq]->int_value() > p.dict_find_int("seq")->int_value() ) {
olditem = item;
} else {
// don't report this error (because of refresh storage)
//incoming_error(e, "old sequence number");
return;
}
} else {
std::pair<char const*, int> bufv = msg_keys[mk_v]->data_section(); std::pair<char const*, int> bufv = msg_keys[mk_v]->data_section();
store_dht_item(item, target, multi, !multi ? msg_keys[mk_seq]->int_value() : 0,
// compare contents before adding to the list msg_keys[mk_height]->int_value(), bufv);
std::pair<char const*, int> bufoldv = p.dict_find("v")->data_section();
if( bufv.second == bufoldv.second && !memcmp(bufv.first, bufoldv.first,bufv.second) ) {
// break so it wont be inserted
break;
}
// if new entry is newer than existing one, it will be inserted before
if( msg_keys[mk_height]->int_value() >= p.dict_find_int_value("height") ) {
insert_pos = j.base();
insert_pos--;
}
}
}
if(multi && j == rend) {
// new entry
lsto.insert(insert_pos, item);
}
if(lsto.size() > m_settings.max_entries_per_multi) {
lsto.resize(m_settings.max_entries_per_multi);
}
}
} }
else if (strcmp(query, "getData") == 0) else if (strcmp(query, "getData") == 0)
{ {
@ -1517,6 +1470,69 @@ void node_impl::incoming_request(msg const& m, entry& e)
} }
} }
void node_impl::store_dht_item(dht_storage_item &item, const big_number &target,
bool multi, int seq, int height, std::pair<char const*, int> &bufv)
{
dht_storage_table_t::iterator i = m_storage_table.find(target);
if (i == m_storage_table.end()) {
// make sure we don't add too many items
if (int(m_storage_table.size()) >= m_settings.max_dht_items)
{
// FIXME: erase one? preferably a multi
}
dht_storage_list_t to_add;
to_add.push_back(item);
boost::tie(i, boost::tuples::ignore) = m_storage_table.insert(
std::make_pair(target, to_add));
} else {
dht_storage_list_t & lsto = i->second;
dht_storage_list_t::reverse_iterator j, rend(lsto.rend());
dht_storage_list_t::iterator insert_pos = lsto.end();
for( j = lsto.rbegin(); j != rend; ++j) {
dht_storage_item &olditem = *j;
lazy_entry p;
int pos;
error_code err;
// FIXME: optimize to avoid bdecode (store seq separated, etc)
int ret = lazy_bdecode(olditem.p.data(), olditem.p.data() + olditem.p.size(), p, err, &pos, 10, 500);
if( !multi ) {
if( seq > p.dict_find_int("seq")->int_value() ) {
olditem = item;
} else {
// don't report this error (because of refresh storage)
//incoming_error(e, "old sequence number");
return;
}
} else {
// compare contents before adding to the list
std::pair<char const*, int> bufoldv = p.dict_find("v")->data_section();
if( bufv.second == bufoldv.second && !memcmp(bufv.first, bufoldv.first,bufv.second) ) {
// break so it wont be inserted
break;
}
// if new entry is newer than existing one, it will be inserted before
if( height >= p.dict_find_int_value("height") ) {
insert_pos = j.base();
insert_pos--;
}
}
}
if(multi && j == rend) {
// new entry
lsto.insert(insert_pos, item);
}
if(lsto.size() > m_settings.max_entries_per_multi) {
lsto.resize(m_settings.max_entries_per_multi);
}
}
}
} } // namespace libtorrent::dht } } // namespace libtorrent::dht

2
src/clientversion.h

@ -8,7 +8,7 @@
// These need to be macros, as version.cpp's and bitcoin-qt.rc's voodoo requires it // These need to be macros, as version.cpp's and bitcoin-qt.rc's voodoo requires it
#define CLIENT_VERSION_MAJOR 0 #define CLIENT_VERSION_MAJOR 0
#define CLIENT_VERSION_MINOR 9 #define CLIENT_VERSION_MINOR 9
#define CLIENT_VERSION_REVISION 19 #define CLIENT_VERSION_REVISION 20
#define CLIENT_VERSION_BUILD 0 #define CLIENT_VERSION_BUILD 0
// Set to true for release, false for prerelease or test build // Set to true for release, false for prerelease or test build

Loading…
Cancel
Save