diff --git a/libtorrent/include/libtorrent/kademlia/dht_tracker.hpp b/libtorrent/include/libtorrent/kademlia/dht_tracker.hpp index 8bdd05c3..ee8408ca 100644 --- a/libtorrent/include/libtorrent/kademlia/dht_tracker.hpp +++ b/libtorrent/include/libtorrent/kademlia/dht_tracker.hpp @@ -84,6 +84,8 @@ namespace libtorrent { namespace dht void start(entry const& bootstrap); void stop(); + bool refresh_storage(); + void add_node(udp::endpoint node); void add_node(std::pair const& node); void add_router_node(udp::endpoint const& node); diff --git a/libtorrent/include/libtorrent/kademlia/node.hpp b/libtorrent/include/libtorrent/kademlia/node.hpp index 405978a0..4ca06220 100644 --- a/libtorrent/include/libtorrent/kademlia/node.hpp +++ b/libtorrent/include/libtorrent/kademlia/node.hpp @@ -179,7 +179,8 @@ public: virtual ~node_impl() {} void tick(); - void refresh(node_id const& id, find_data::nodes_callback const& f); + bool refresh_storage(); + void refresh(node_id const& id, find_data::nodes_callback const& f); void bootstrap(std::vector const& nodes , find_data::nodes_callback const& f); void add_router_node(udp::endpoint router); diff --git a/libtorrent/src/kademlia/dht_get.cpp b/libtorrent/src/kademlia/dht_get.cpp index 3edee1f8..ca45a83c 100644 --- a/libtorrent/src/kademlia/dht_get.cpp +++ b/libtorrent/src/kademlia/dht_get.cpp @@ -125,7 +125,7 @@ void dht_get_observer::reply(msg const& m) values_list.push_back(entry()); values_list.back() = *e; } - printf("dht_get::reply from %s:%d with %d entries\n", m.addr.address().to_string().c_str(), m.addr.port(), values_list.size()); + //printf("dht_get::reply from %s:%d with %d entries\n", m.addr.address().to_string().c_str(), m.addr.port(), values_list.size()); static_cast(m_algorithm.get())->got_data(values_list); } diff --git a/libtorrent/src/kademlia/dht_tracker.cpp b/libtorrent/src/kademlia/dht_tracker.cpp index c56a7411..4c496b93 100644 --- a/libtorrent/src/kademlia/dht_tracker.cpp +++ b/libtorrent/src/kademlia/dht_tracker.cpp @@ -283,6 +283,11 @@ namespace libtorrent { namespace dht m_host_resolver.cancel(); } + bool dht_tracker::refresh_storage() + { + return m_dht.refresh_storage(); + } + void dht_tracker::dht_status(session_status& s) { m_dht.status(s); diff --git a/libtorrent/src/kademlia/node.cpp b/libtorrent/src/kademlia/node.cpp index 4a0bc797..4637b140 100644 --- a/libtorrent/src/kademlia/node.cpp +++ b/libtorrent/src/kademlia/node.cpp @@ -305,12 +305,9 @@ namespace } } - // [MF] FIXME: putData_fun must receive {p, sig_p} (no need to sign it several times) void putData_fun(std::vector > const& v, node_impl& node, - std::string const &username, std::string const &resource, bool multi, - entry const &value, std::string const &sig_user, - boost::int64_t timeutc, int seq) + entry const &p, std::string const &sig_p, std::string const &sig_user) { #ifdef TORRENT_DHT_VERBOSE_LOGGING TORRENT_LOG(node) << "sending putData [ username: " << username @@ -342,24 +339,7 @@ namespace entry& a = e["x"]; a["token"] = i->second; - entry& p = a["p"]; - entry& target = p["target"]; - target["n"] = username; - target["r"] = resource; - target["t"] = (multi) ? "m" : "s"; - if (seq >= 0 && !multi) p["seq"] = seq; - p["v"] = value; - p["time"] = timeutc; - p["height"] = getBestHeight()-1; // be conservative - - std::vector pbuf; - bencode(std::back_inserter(pbuf), p); - std::string sig_p = createSignature(std::string(pbuf.data(),pbuf.size()), sig_user); - if( !sig_p.size() ) { - printf("putData_fun: createSignature error (this should have been caught earlier)\n"); - return; - } - + a["p"] = p; a["sig_p"] = sig_p; a["sig_user"] = sig_user; @@ -448,13 +428,31 @@ void node_impl::putData(std::string const &username, std::string const &resource #endif printf("putData: username=%s,res=%s,multi=%d sig_user=%s\n", username.c_str(), resource.c_str(), multi, sig_user.c_str()); + + // construct p dictionary and sign it + entry p; + entry& target = p["target"]; + target["n"] = username; + target["r"] = resource; + target["t"] = (multi) ? "m" : "s"; + if (seq >= 0 && !multi) p["seq"] = seq; + p["v"] = value; + p["time"] = timeutc; + p["height"] = getBestHeight()-1; // be conservative + + std::vector pbuf; + bencode(std::back_inserter(pbuf), p); + std::string sig_p = createSignature(std::string(pbuf.data(),pbuf.size()), sig_user); + if( !sig_p.size() ) { + printf("putData: createSignature error (this should have been caught earlier)\n"); + return; + } + // search for nodes with ids close to id or with peers // for info-hash id. then send putData to them. boost::intrusive_ptr ta(new dht_get(*this, username, resource, multi, boost::bind(&nop), - boost::bind(&putData_fun, _1, boost::ref(*this), - username, resource, multi, - value, sig_user, timeutc, seq), true)); + boost::bind(&putData_fun, _1, boost::ref(*this), p, sig_p, sig_user), true)); ta->start(); } @@ -480,38 +478,60 @@ void node_impl::tick() refresh(target, boost::bind(&nop)); ptime now = time_now(); - if (now - m_last_storage_refresh > minutes(2)) { + if (now - m_last_storage_refresh > minutes(60)) { m_last_storage_refresh = now; + refresh_storage(); + } +} - printf("node dht: refreshing storage...\n"); - - for (dht_storage_table_t::const_iterator i = m_storage_table.begin(), - end(m_storage_table.end()); i != end; ++i ) - { - dht_storage_list_t const& lsto = i->second; - if( lsto.size() == 1 ) { - dht_storage_item const& item = lsto.front(); - - lazy_entry p; - int pos; - error_code err; - // FIXME: optimize to avoid bdecode (store seq separated, etc) - int ret = lazy_bdecode(item.p.data(), item.p.data() + item.p.size(), p, err, &pos, 10, 500); - - const lazy_entry *target = p.dict_find_dict("target"); - - // refresh only signed single posts - if( target->dict_find_string_value("t") == "s" ) { - printf("refresh dht storage: [%s,%s,%s]\n", - target->dict_find_string_value("n").c_str(), - target->dict_find_string_value("r").c_str(), - target->dict_find_string_value("t").c_str()); - } +bool node_impl::refresh_storage() { + bool did_something = false; + + if( m_storage_table.size() == 0 ) + return did_something; + + printf("node dht: refreshing storage...\n"); + + for (dht_storage_table_t::const_iterator i = m_storage_table.begin(), + end(m_storage_table.end()); i != end; ++i ) + { + dht_storage_list_t const& lsto = i->second; + if( lsto.size() == 1 ) { + dht_storage_item const& item = lsto.front(); + + lazy_entry p; + int pos; + error_code err; + // FIXME: optimize to avoid bdecode (store seq separated, etc) + int ret = lazy_bdecode(item.p.data(), item.p.data() + item.p.size(), p, err, &pos, 10, 500); + + const lazy_entry *target = p.dict_find_dict("target"); + std::string username = target->dict_find_string_value("n"); + std::string resource = target->dict_find_string_value("r"); + bool multi = (target->dict_find_string_value("t") == "m"); + + // refresh only signed single posts + if( !multi ) { + printf("refresh dht storage: [%s,%s,%s]\n", + username.c_str(), + resource.c_str(), + target->dict_find_string_value("t").c_str()); + + entry entryP; + entryP = p; // lazy to non-lazy + + // search for nodes with ids close to id or with peers + // for info-hash id. then send putData to them. + boost::intrusive_ptr ta(new dht_get(*this, username, resource, multi, + boost::bind(&nop), + boost::bind(&putData_fun, _1, boost::ref(*this), + entryP, item.sig_p, item.sig_user), true)); + ta->start(); + did_something = true; } } - - } + return did_something; } time_duration node_impl::connection_timeout() @@ -1129,7 +1149,8 @@ void node_impl::incoming_request(msg const& m, entry& e) if( msg_keys[mk_seq]->int_value() > p.dict_find_int("seq")->int_value() ) { olditem = item; } else { - incoming_error(e, "old sequence number"); + // don't report this error (because of refresh storage) + //incoming_error(e, "old sequence number"); return; } } else { diff --git a/libtorrent/src/session_impl.cpp b/libtorrent/src/session_impl.cpp index abe82c38..0f748d9c 100644 --- a/libtorrent/src/session_impl.cpp +++ b/libtorrent/src/session_impl.cpp @@ -5707,6 +5707,14 @@ retry: void session_impl::stop_dht() { if (!m_dht) return; + + /* FIXME: good idea... unfortunately it doesnt work. + if( m_dht->refresh_storage() ) { + // a small chance to refresh to suceed + sleep(5000); + } + */ + m_udp_socket.unsubscribe(m_dht.get()); m_dht->stop(); m_dht = 0;