newpostmsg command, working.

This commit is contained in:
miguel 2013-08-20 12:18:03 -03:00
parent 4f51727c1b
commit 5112e73a07
13 changed files with 89 additions and 21 deletions

View File

@ -184,7 +184,7 @@ namespace libtorrent
int seed_rank(session_settings const& s) const;
enum flags_t { overwrite_existing = 1 };
void add_piece(int piece, char const* data, int flags = 0);
void add_piece(int piece, char const* data, int size, int flags = 0);
void on_disk_write_complete(int ret, disk_io_job const& j
, peer_request p);
void on_disk_cache_complete(int ret, disk_io_job const& j);
@ -659,7 +659,7 @@ namespace libtorrent
// this is called from the peer_connection
// each time a piece has failed the hash
// test
void piece_finished(int index, int passed_hash_check);
void piece_finished(int index, int passed_hash_check, int piece_size);
// piece_passed is called when a piece passes the hash check
// this will tell all peers that we just got his piece

View File

@ -166,7 +166,7 @@ namespace libtorrent
torrent_handle() {}
enum flags_t { overwrite_existing = 1 };
void add_piece(int piece, char const* data, int flags = 0) const;
void add_piece(int piece, char const* data, int size, int flags = 0) const;
void read_piece(int piece) const;
bool have_piece(int piece) const;

View File

@ -227,7 +227,8 @@ namespace libtorrent
if (m_state < read_packet_size) return;
boost::shared_ptr<torrent> t = associated_torrent().lock();
TORRENT_ASSERT(t);
write_bitfield();
if(!m_sent_bitfield)
write_bitfield();
#ifndef TORRENT_DISABLE_DHT
if (m_supports_dht_port && m_ses.m_dht)
write_dht_port(m_ses.m_external_udp_port);

View File

@ -1781,6 +1781,7 @@ namespace libtorrent
m_file_pool.release(0);
}
#endif
m_settings = session_settings();
m_settings = *s;
delete s;
@ -2201,11 +2202,14 @@ namespace libtorrent
mutex::scoped_lock l(m_piece_mutex);
INVARIANT_CHECK;
printf("disk_io_thread:hash for piece %d hash\n", j.piece);
cache_piece_index_t& idx = m_pieces.get<0>();
cache_piece_index_t::iterator i = find_cached_piece(m_pieces, j, l);
if (i != idx.end())
{
TORRENT_ASSERT(i->storage);
//printf("disk_io_thread:flush piece %d (size=%d) before hash\n",
// j.piece, i->piece_size);
TORRENT_ASSERT(i->storage);
int ret = flush_range(const_cast<cached_piece_entry&>(*i), 0, INT_MAX, l);
idx.erase(i);
if (test_error(j))

View File

@ -2495,7 +2495,7 @@ namespace libtorrent
check_postcondition post_checker2_(t, false);
#endif
t->async_verify_piece(p.piece, boost::bind(&torrent::piece_finished, t
, p.piece, _1));
, p.piece, _1, p.length));
}
if (is_disconnecting()) return;

View File

@ -5773,7 +5773,7 @@ retry:
if (m_dht) m_dht->getData(username, resource, multi,
boost::bind( post_dht_getData, this, _1),
boost::bind( getDataDone_fun, this, username, resource, multi, _1, _2));
}
}
void session_impl::on_dht_router_name_lookup(error_code const& e
, tcp::resolver::iterator host)

View File

@ -225,6 +225,7 @@ namespace libtorrent
buf.iov_len = slot_size;
// deliberately pass in 0 as flags, to disable random_access
int ret = m_storage->readv(&buf, slot, 0, 1, 0);
//printf("piece_manager::hash_for_slot %d ret=%d\n", slot, ret);
if (ret > 0) num_read += ret;
// TODO: if the read fails, set error and exit immediately

View File

@ -1097,11 +1097,15 @@ namespace libtorrent
}
// [MF] test only?
void torrent::add_piece(int piece, char const* data, int flags)
void torrent::add_piece(int piece, char const* data, int size, int flags)
{
TORRENT_ASSERT(m_ses.is_network_thread());
//[MF] increase num pieces to ensure we are not a seeder
increase_num_pieces(piece+2);
TORRENT_ASSERT(piece >= 0 && piece < m_torrent_file->num_pieces());
int piece_size = m_torrent_file->piece_size(piece);
int piece_size = size;
int blocks_in_piece = (piece_size + block_size() - 1) / block_size();
// avoid crash trying to access the picker when there is none
@ -1138,7 +1142,7 @@ namespace libtorrent
picker().mark_as_writing(block, 0);
}
async_verify_piece(piece, boost::bind(&torrent::piece_finished
, shared_from_this(), piece, _1));
, shared_from_this(), piece, _1, p.length));
picker().dec_refcount(piece, 0);
}
@ -1848,7 +1852,7 @@ namespace libtorrent
m_picker->mark_as_finished(piece_block(piece, block), 0);
if (m_picker->is_piece_finished(piece))
async_verify_piece(piece, boost::bind(&torrent::piece_finished
, shared_from_this(), piece, _1));
, shared_from_this(), piece, _1, 0));
}
}
}
@ -3045,7 +3049,7 @@ namespace libtorrent
// 0: success, piece passed check
// -1: disk failure
// -2: piece failed check
void torrent::piece_finished(int index, int passed_hash_check)
void torrent::piece_finished(int index, int passed_hash_check, int piece_size)
{
TORRENT_ASSERT(m_ses.is_network_thread());
#if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING
@ -3053,7 +3057,7 @@ namespace libtorrent
, index, ((passed_hash_check == 0)
?"passed":passed_hash_check == -1
?"disk failed":"failed")
, m_torrent_file->piece_size(index));
, piece_size);
#endif
TORRENT_ASSERT(valid_metadata());
@ -4926,6 +4930,21 @@ namespace libtorrent
if(has_picker()) {
m_picker->increase_num_pieces(num_pieces);
}
if (m_connections_initialized)
{
// all peer connections have to update num_pieces
for (torrent::peer_iterator i = m_connections.begin();
i != m_connections.end();)
{
peer_connection* pc = *i;
++i;
if (pc->is_disconnecting()) continue;
pc->on_metadata_impl();
//if (pc->is_disconnecting()) continue;
//pc->init();
}
}
}
void torrent::read_resume_data(lazy_entry const& rd)
@ -6286,8 +6305,8 @@ namespace libtorrent
++i;
if (pc->is_disconnecting()) continue;
pc->on_metadata_impl();
if (pc->is_disconnecting()) continue;
pc->init();
//if (pc->is_disconnecting()) continue;
//pc->init();
}
}

View File

@ -227,6 +227,16 @@ namespace libtorrent
t.reset(); \
do { ses.cond.wait(l); } while(!done); }
#define TORRENT_SYNC_CALL4(x, a1, a2, a3, a4) \
boost::shared_ptr<torrent> t = m_torrent.lock(); \
if (t) { \
bool done = false; \
session_impl& ses = t->session(); \
mutex::scoped_lock l(ses.mut); \
ses.m_io_service.dispatch(boost::bind(&fun_wrap, &done, &ses.cond, &ses.mut, boost::function<void(void)>(boost::bind(&torrent:: x, t, a1, a2, a3, a4)))); \
t.reset(); \
do { ses.cond.wait(l); } while(!done); }
#define TORRENT_SYNC_CALL_RET(type, def, x) \
boost::shared_ptr<torrent> t = m_torrent.lock(); \
if (!t) return def; \
@ -789,10 +799,11 @@ namespace libtorrent
TORRENT_ASYNC_CALL1(add_tracker, url);
}
void torrent_handle::add_piece(int piece, char const* data, int flags) const
void torrent_handle::add_piece(int piece, char const* data, int size, int flags) const
{
INVARIANT_CHECK;
TORRENT_SYNC_CALL3(add_piece, piece, data, flags);
// [MF] sync just to ensure the buffer still valid
TORRENT_SYNC_CALL4(add_piece, piece, data, size, flags);
}
void torrent_handle::read_piece(int piece) const

View File

@ -108,7 +108,7 @@ void test_running_torrent(boost::intrusive_ptr<torrent_info> info, size_type fil
std::vector<char> piece(info->piece_length());
for (int i = 0; i < int(piece.size()); ++i)
piece[i] = (i % 26) + 'A';
h.add_piece(0, &piece[0]);
h.add_piece(0, &piece[0], info->piece_length());
test_sleep(10000);
st = h.status();
TEST_CHECK(st.pieces.size() > 0 && st.pieces[0] == true);

View File

@ -238,6 +238,7 @@ static const CRPCCommand vRPCCommands[] =
// twister dht network
{ "dhtput", &dhtput, false, true },
{ "dhtget", &dhtget, false, true },
{ "newpostmsg", &newpostmsg, false, true },
};
CRPCTable::CRPCTable()

View File

@ -195,5 +195,6 @@ extern json_spirit::Value gettxout(const json_spirit::Array& params, bool fHelp)
extern json_spirit::Value verifychain(const json_spirit::Array& params, bool fHelp);
extern json_spirit::Value dhtput(const json_spirit::Array& params, bool fHelp);
extern json_spirit::Value dhtget(const json_spirit::Array& params, bool fHelp);
extern json_spirit::Value newpostmsg(const json_spirit::Array& params, bool fHelp);
#endif

View File

@ -34,6 +34,7 @@ static session *ses = NULL;
static CCriticalSection cs_dhtgetMap;
static map<sha1_hash, alert_manager*> m_dhtgetMap;
static map<std::string, bool> m_specialResources;
static map<std::string, torrent_handle> m_userTorrent;
sha1_hash dhtTargetHash(std::string const &username, std::string const &resource, std::string const &type)
{
@ -347,15 +348,14 @@ void ThreadSessionAlerts()
// Do something!
printf("Neighbor of special resource - do something!\n");
if( dd->m_resource == "tracker" ) {
torrent_handle hnd = ses->find_torrent(ih);
if( !hnd.is_valid() ) {
if( !m_userTorrent.count(dd->m_username) ) {
printf("adding torrent for [%s,tracker]\n", dd->m_username.c_str());
add_torrent_params tparams;
tparams.info_hash = ih;
tparams.name = dd->m_username;
boost::filesystem::path torrentPath = GetDataDir() / "swarm" / "";
tparams.save_path= torrentPath.string();
ses->async_add_torrent(tparams);
m_userTorrent[dd->m_username] = ses->add_torrent(tparams);
}
}
}
@ -407,6 +407,8 @@ void startSessionTorrent(boost::thread_group& threadGroup)
printf("startSessionTorrent (waiting for external IP)\n");
m_specialResources["tracker"] = true;
m_specialResources["swarm"] = true;
threadGroup.create_thread(boost::bind(&ThreadWaitExtIP));
threadGroup.create_thread(boost::bind(&ThreadMaintainDHTNodes));
@ -806,3 +808,31 @@ Value dhtget(const Array& params, bool fHelp)
return ret;
}
Value newpostmsg(const Array& params, bool fHelp)
{
if (fHelp || params.size() != 3)
throw runtime_error(
"newpost <username> <k> <msg>\n"
"Post a new message to swarm");
EnsureWalletIsUnlocked();
string strUsername = params[0].get_str();
string strK = params[1].get_str();
string strMsg = params[2].get_str();
int k = atoi( strK.c_str() );
entry v;
createSignedUserpost(v, strUsername, k, strMsg,
NULL, NULL, NULL,
std::string(""), 0);
std::vector<char> buf;
bencode(std::back_inserter(buf), v);
torrent_handle h = m_userTorrent[strUsername];
h.add_piece(k,buf.data(),buf.size());
return Value();
}