From 5112e73a07a60a049b32aee81192ee61aa5a053f Mon Sep 17 00:00:00 2001 From: miguel Date: Tue, 20 Aug 2013 12:18:03 -0300 Subject: [PATCH] newpostmsg command, working. --- libtorrent/include/libtorrent/torrent.hpp | 4 +-- .../include/libtorrent/torrent_handle.hpp | 2 +- libtorrent/src/bt_peer_connection.cpp | 3 +- libtorrent/src/disk_io_thread.cpp | 6 +++- libtorrent/src/peer_connection.cpp | 2 +- libtorrent/src/session_impl.cpp | 2 +- libtorrent/src/storage.cpp | 1 + libtorrent/src/torrent.cpp | 35 +++++++++++++----- libtorrent/src/torrent_handle.cpp | 15 ++++++-- libtorrent/test/test_torrent.cpp | 2 +- src/bitcoinrpc.cpp | 1 + src/bitcoinrpc.h | 1 + src/twister.cpp | 36 +++++++++++++++++-- 13 files changed, 89 insertions(+), 21 deletions(-) diff --git a/libtorrent/include/libtorrent/torrent.hpp b/libtorrent/include/libtorrent/torrent.hpp index 58a377cf..14597bc6 100644 --- a/libtorrent/include/libtorrent/torrent.hpp +++ b/libtorrent/include/libtorrent/torrent.hpp @@ -184,7 +184,7 @@ namespace libtorrent int seed_rank(session_settings const& s) const; enum flags_t { overwrite_existing = 1 }; - void add_piece(int piece, char const* data, int flags = 0); + void add_piece(int piece, char const* data, int size, int flags = 0); void on_disk_write_complete(int ret, disk_io_job const& j , peer_request p); void on_disk_cache_complete(int ret, disk_io_job const& j); @@ -659,7 +659,7 @@ namespace libtorrent // this is called from the peer_connection // each time a piece has failed the hash // test - void piece_finished(int index, int passed_hash_check); + void piece_finished(int index, int passed_hash_check, int piece_size); // piece_passed is called when a piece passes the hash check // this will tell all peers that we just got his piece diff --git a/libtorrent/include/libtorrent/torrent_handle.hpp b/libtorrent/include/libtorrent/torrent_handle.hpp index 149ce0e3..e2aa5e02 100644 --- a/libtorrent/include/libtorrent/torrent_handle.hpp +++ b/libtorrent/include/libtorrent/torrent_handle.hpp @@ -166,7 +166,7 @@ namespace libtorrent torrent_handle() {} enum flags_t { overwrite_existing = 1 }; - void add_piece(int piece, char const* data, int flags = 0) const; + void add_piece(int piece, char const* data, int size, int flags = 0) const; void read_piece(int piece) const; bool have_piece(int piece) const; diff --git a/libtorrent/src/bt_peer_connection.cpp b/libtorrent/src/bt_peer_connection.cpp index ecdd7667..2784a431 100644 --- a/libtorrent/src/bt_peer_connection.cpp +++ b/libtorrent/src/bt_peer_connection.cpp @@ -227,7 +227,8 @@ namespace libtorrent if (m_state < read_packet_size) return; boost::shared_ptr t = associated_torrent().lock(); TORRENT_ASSERT(t); - write_bitfield(); + if(!m_sent_bitfield) + write_bitfield(); #ifndef TORRENT_DISABLE_DHT if (m_supports_dht_port && m_ses.m_dht) write_dht_port(m_ses.m_external_udp_port); diff --git a/libtorrent/src/disk_io_thread.cpp b/libtorrent/src/disk_io_thread.cpp index 06cb391d..ae7aeec6 100644 --- a/libtorrent/src/disk_io_thread.cpp +++ b/libtorrent/src/disk_io_thread.cpp @@ -1781,6 +1781,7 @@ namespace libtorrent m_file_pool.release(0); } #endif + m_settings = session_settings(); m_settings = *s; delete s; @@ -2201,11 +2202,14 @@ namespace libtorrent mutex::scoped_lock l(m_piece_mutex); INVARIANT_CHECK; + printf("disk_io_thread:hash for piece %d hash\n", j.piece); cache_piece_index_t& idx = m_pieces.get<0>(); cache_piece_index_t::iterator i = find_cached_piece(m_pieces, j, l); if (i != idx.end()) { - TORRENT_ASSERT(i->storage); + //printf("disk_io_thread:flush piece %d (size=%d) before hash\n", + // j.piece, i->piece_size); + TORRENT_ASSERT(i->storage); int ret = flush_range(const_cast(*i), 0, INT_MAX, l); idx.erase(i); if (test_error(j)) diff --git a/libtorrent/src/peer_connection.cpp b/libtorrent/src/peer_connection.cpp index a7d811eb..b582a44d 100644 --- a/libtorrent/src/peer_connection.cpp +++ b/libtorrent/src/peer_connection.cpp @@ -2495,7 +2495,7 @@ namespace libtorrent check_postcondition post_checker2_(t, false); #endif t->async_verify_piece(p.piece, boost::bind(&torrent::piece_finished, t - , p.piece, _1)); + , p.piece, _1, p.length)); } if (is_disconnecting()) return; diff --git a/libtorrent/src/session_impl.cpp b/libtorrent/src/session_impl.cpp index 259a259c..abe82c38 100644 --- a/libtorrent/src/session_impl.cpp +++ b/libtorrent/src/session_impl.cpp @@ -5773,7 +5773,7 @@ retry: if (m_dht) m_dht->getData(username, resource, multi, boost::bind( post_dht_getData, this, _1), boost::bind( getDataDone_fun, this, username, resource, multi, _1, _2)); - } + } void session_impl::on_dht_router_name_lookup(error_code const& e , tcp::resolver::iterator host) diff --git a/libtorrent/src/storage.cpp b/libtorrent/src/storage.cpp index d04ef765..5dc1c630 100644 --- a/libtorrent/src/storage.cpp +++ b/libtorrent/src/storage.cpp @@ -225,6 +225,7 @@ namespace libtorrent buf.iov_len = slot_size; // deliberately pass in 0 as flags, to disable random_access int ret = m_storage->readv(&buf, slot, 0, 1, 0); + //printf("piece_manager::hash_for_slot %d ret=%d\n", slot, ret); if (ret > 0) num_read += ret; // TODO: if the read fails, set error and exit immediately diff --git a/libtorrent/src/torrent.cpp b/libtorrent/src/torrent.cpp index da5ff342..95ad7150 100644 --- a/libtorrent/src/torrent.cpp +++ b/libtorrent/src/torrent.cpp @@ -1097,11 +1097,15 @@ namespace libtorrent } // [MF] test only? - void torrent::add_piece(int piece, char const* data, int flags) + void torrent::add_piece(int piece, char const* data, int size, int flags) { TORRENT_ASSERT(m_ses.is_network_thread()); + + //[MF] increase num pieces to ensure we are not a seeder + increase_num_pieces(piece+2); + TORRENT_ASSERT(piece >= 0 && piece < m_torrent_file->num_pieces()); - int piece_size = m_torrent_file->piece_size(piece); + int piece_size = size; int blocks_in_piece = (piece_size + block_size() - 1) / block_size(); // avoid crash trying to access the picker when there is none @@ -1138,7 +1142,7 @@ namespace libtorrent picker().mark_as_writing(block, 0); } async_verify_piece(piece, boost::bind(&torrent::piece_finished - , shared_from_this(), piece, _1)); + , shared_from_this(), piece, _1, p.length)); picker().dec_refcount(piece, 0); } @@ -1848,7 +1852,7 @@ namespace libtorrent m_picker->mark_as_finished(piece_block(piece, block), 0); if (m_picker->is_piece_finished(piece)) async_verify_piece(piece, boost::bind(&torrent::piece_finished - , shared_from_this(), piece, _1)); + , shared_from_this(), piece, _1, 0)); } } } @@ -3045,7 +3049,7 @@ namespace libtorrent // 0: success, piece passed check // -1: disk failure // -2: piece failed check - void torrent::piece_finished(int index, int passed_hash_check) + void torrent::piece_finished(int index, int passed_hash_check, int piece_size) { TORRENT_ASSERT(m_ses.is_network_thread()); #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING @@ -3053,7 +3057,7 @@ namespace libtorrent , index, ((passed_hash_check == 0) ?"passed":passed_hash_check == -1 ?"disk failed":"failed") - , m_torrent_file->piece_size(index)); + , piece_size); #endif TORRENT_ASSERT(valid_metadata()); @@ -4926,6 +4930,21 @@ namespace libtorrent if(has_picker()) { m_picker->increase_num_pieces(num_pieces); } + + if (m_connections_initialized) + { + // all peer connections have to update num_pieces + for (torrent::peer_iterator i = m_connections.begin(); + i != m_connections.end();) + { + peer_connection* pc = *i; + ++i; + if (pc->is_disconnecting()) continue; + pc->on_metadata_impl(); + //if (pc->is_disconnecting()) continue; + //pc->init(); + } + } } void torrent::read_resume_data(lazy_entry const& rd) @@ -6286,8 +6305,8 @@ namespace libtorrent ++i; if (pc->is_disconnecting()) continue; pc->on_metadata_impl(); - if (pc->is_disconnecting()) continue; - pc->init(); + //if (pc->is_disconnecting()) continue; + //pc->init(); } } diff --git a/libtorrent/src/torrent_handle.cpp b/libtorrent/src/torrent_handle.cpp index a9db6f0f..aa606734 100644 --- a/libtorrent/src/torrent_handle.cpp +++ b/libtorrent/src/torrent_handle.cpp @@ -227,6 +227,16 @@ namespace libtorrent t.reset(); \ do { ses.cond.wait(l); } while(!done); } +#define TORRENT_SYNC_CALL4(x, a1, a2, a3, a4) \ + boost::shared_ptr t = m_torrent.lock(); \ + if (t) { \ + bool done = false; \ + session_impl& ses = t->session(); \ + mutex::scoped_lock l(ses.mut); \ + ses.m_io_service.dispatch(boost::bind(&fun_wrap, &done, &ses.cond, &ses.mut, boost::function(boost::bind(&torrent:: x, t, a1, a2, a3, a4)))); \ + t.reset(); \ + do { ses.cond.wait(l); } while(!done); } + #define TORRENT_SYNC_CALL_RET(type, def, x) \ boost::shared_ptr t = m_torrent.lock(); \ if (!t) return def; \ @@ -789,10 +799,11 @@ namespace libtorrent TORRENT_ASYNC_CALL1(add_tracker, url); } - void torrent_handle::add_piece(int piece, char const* data, int flags) const + void torrent_handle::add_piece(int piece, char const* data, int size, int flags) const { INVARIANT_CHECK; - TORRENT_SYNC_CALL3(add_piece, piece, data, flags); + // [MF] sync just to ensure the buffer still valid + TORRENT_SYNC_CALL4(add_piece, piece, data, size, flags); } void torrent_handle::read_piece(int piece) const diff --git a/libtorrent/test/test_torrent.cpp b/libtorrent/test/test_torrent.cpp index 61ddfea2..a016e26f 100644 --- a/libtorrent/test/test_torrent.cpp +++ b/libtorrent/test/test_torrent.cpp @@ -108,7 +108,7 @@ void test_running_torrent(boost::intrusive_ptr info, size_type fil std::vector piece(info->piece_length()); for (int i = 0; i < int(piece.size()); ++i) piece[i] = (i % 26) + 'A'; - h.add_piece(0, &piece[0]); + h.add_piece(0, &piece[0], info->piece_length()); test_sleep(10000); st = h.status(); TEST_CHECK(st.pieces.size() > 0 && st.pieces[0] == true); diff --git a/src/bitcoinrpc.cpp b/src/bitcoinrpc.cpp index cf44de7f..cf9cbbe6 100644 --- a/src/bitcoinrpc.cpp +++ b/src/bitcoinrpc.cpp @@ -238,6 +238,7 @@ static const CRPCCommand vRPCCommands[] = // twister dht network { "dhtput", &dhtput, false, true }, { "dhtget", &dhtget, false, true }, + { "newpostmsg", &newpostmsg, false, true }, }; CRPCTable::CRPCTable() diff --git a/src/bitcoinrpc.h b/src/bitcoinrpc.h index d0899245..a4bbc833 100644 --- a/src/bitcoinrpc.h +++ b/src/bitcoinrpc.h @@ -195,5 +195,6 @@ extern json_spirit::Value gettxout(const json_spirit::Array& params, bool fHelp) extern json_spirit::Value verifychain(const json_spirit::Array& params, bool fHelp); extern json_spirit::Value dhtput(const json_spirit::Array& params, bool fHelp); extern json_spirit::Value dhtget(const json_spirit::Array& params, bool fHelp); +extern json_spirit::Value newpostmsg(const json_spirit::Array& params, bool fHelp); #endif diff --git a/src/twister.cpp b/src/twister.cpp index a6ff3566..aa07e5b5 100644 --- a/src/twister.cpp +++ b/src/twister.cpp @@ -34,6 +34,7 @@ static session *ses = NULL; static CCriticalSection cs_dhtgetMap; static map m_dhtgetMap; static map m_specialResources; +static map m_userTorrent; sha1_hash dhtTargetHash(std::string const &username, std::string const &resource, std::string const &type) { @@ -347,15 +348,14 @@ void ThreadSessionAlerts() // Do something! printf("Neighbor of special resource - do something!\n"); if( dd->m_resource == "tracker" ) { - torrent_handle hnd = ses->find_torrent(ih); - if( !hnd.is_valid() ) { + if( !m_userTorrent.count(dd->m_username) ) { printf("adding torrent for [%s,tracker]\n", dd->m_username.c_str()); add_torrent_params tparams; tparams.info_hash = ih; tparams.name = dd->m_username; boost::filesystem::path torrentPath = GetDataDir() / "swarm" / ""; tparams.save_path= torrentPath.string(); - ses->async_add_torrent(tparams); + m_userTorrent[dd->m_username] = ses->add_torrent(tparams); } } } @@ -407,6 +407,8 @@ void startSessionTorrent(boost::thread_group& threadGroup) printf("startSessionTorrent (waiting for external IP)\n"); m_specialResources["tracker"] = true; + m_specialResources["swarm"] = true; + threadGroup.create_thread(boost::bind(&ThreadWaitExtIP)); threadGroup.create_thread(boost::bind(&ThreadMaintainDHTNodes)); @@ -806,3 +808,31 @@ Value dhtget(const Array& params, bool fHelp) return ret; } +Value newpostmsg(const Array& params, bool fHelp) +{ + if (fHelp || params.size() != 3) + throw runtime_error( + "newpost \n" + "Post a new message to swarm"); + + EnsureWalletIsUnlocked(); + + string strUsername = params[0].get_str(); + string strK = params[1].get_str(); + string strMsg = params[2].get_str(); + int k = atoi( strK.c_str() ); + + entry v; + createSignedUserpost(v, strUsername, k, strMsg, + NULL, NULL, NULL, + std::string(""), 0); + + std::vector buf; + bencode(std::back_inserter(buf), v); + + torrent_handle h = m_userTorrent[strUsername]; + h.add_piece(k,buf.data(),buf.size()); + + return Value(); +} +