experimental 'peekpost' RPC

This commit is contained in:
Miguel Freitas 2015-12-18 21:20:50 -02:00
parent 3fd1432f4a
commit 679246064f
3 changed files with 145 additions and 20 deletions

View File

@ -284,6 +284,7 @@ static const CRPCCommand vRPCCommands[] =
{ "leavegroup", &leavegroup, false, true, false }, { "leavegroup", &leavegroup, false, true, false },
{ "getpieceavailability", &getpieceavailability, false, true, true }, { "getpieceavailability", &getpieceavailability, false, true, true },
{ "getpiecemaxseen", &getpiecemaxseen, false, true, true }, { "getpiecemaxseen", &getpiecemaxseen, false, true, true },
{ "peekpost", &peekpost, false, true, true },
}; };
CRPCTable::CRPCTable() CRPCTable::CRPCTable()
@ -1362,6 +1363,8 @@ Array RPCConvertValues(const std::string &strMethod, const std::vector<std::stri
if (strMethod == "newgroupdescription" && n > 1) ConvertTo<boost::int64_t>(params[1]); if (strMethod == "newgroupdescription" && n > 1) ConvertTo<boost::int64_t>(params[1]);
if (strMethod == "getpieceavailability" && n > 1) ConvertTo<boost::int64_t>(params[1]); if (strMethod == "getpieceavailability" && n > 1) ConvertTo<boost::int64_t>(params[1]);
if (strMethod == "getpiecemaxseen" && n > 1) ConvertTo<boost::int64_t>(params[1]); if (strMethod == "getpiecemaxseen" && n > 1) ConvertTo<boost::int64_t>(params[1]);
if (strMethod == "peekpost" && n > 1) ConvertTo<boost::int64_t>(params[1]);
if (strMethod == "peekpost" && n > 3) ConvertTo<boost::int64_t>(params[3]);
return params; return params;
} }

View File

@ -49,6 +49,7 @@ enum RPCErrorCode
RPC_DATABASE_ERROR = -20, // Database error RPC_DATABASE_ERROR = -20, // Database error
RPC_DESERIALIZATION_ERROR = -22, // Error parsing or validating structure in raw format RPC_DESERIALIZATION_ERROR = -22, // Error parsing or validating structure in raw format
RPC_FORBIDDEN_ON_PUBLIC_SERVER = -23, // public server mode is activated, this method is not allowed RPC_FORBIDDEN_ON_PUBLIC_SERVER = -23, // public server mode is activated, this method is not allowed
RPC_TIMEOUT = -24, // timeout on resource retrieval
// P2P client errors // P2P client errors
RPC_CLIENT_NOT_CONNECTED = -9, // Bitcoin is not connected RPC_CLIENT_NOT_CONNECTED = -9, // Bitcoin is not connected
@ -234,5 +235,6 @@ extern json_spirit::Value newgroupdescription(const json_spirit::Array& params,
extern json_spirit::Value leavegroup(const json_spirit::Array& params, bool fHelp); extern json_spirit::Value leavegroup(const json_spirit::Array& params, bool fHelp);
extern json_spirit::Value getpieceavailability(const json_spirit::Array& params, bool fHelp); extern json_spirit::Value getpieceavailability(const json_spirit::Array& params, bool fHelp);
extern json_spirit::Value getpiecemaxseen(const json_spirit::Array& params, bool fHelp); extern json_spirit::Value getpiecemaxseen(const json_spirit::Array& params, bool fHelp);
extern json_spirit::Value peekpost(const json_spirit::Array& params, bool fHelp);
#endif #endif

View File

@ -138,15 +138,16 @@ void dhtgetMapPost(sha1_hash &ih, const alert &a)
} }
} }
torrent_handle startTorrentUser(std::string const &username, bool following) torrent_handle startTorrentUser(std::string const &username, bool following, int peek_single_piece=-1)
{ {
bool userInTxDb = usernameExists(username); // keep this outside cs_twister to avoid deadlock bool userInTxDb = usernameExists(username); // keep this outside cs_twister to avoid deadlock
boost::shared_ptr<session> ses(m_ses); boost::shared_ptr<session> ses(m_ses);
if( !userInTxDb || !ses ) if( !userInTxDb || !ses )
return torrent_handle(); return torrent_handle();
torrent_handle h;
LOCK(cs_twister); LOCK(cs_twister);
if( !m_userTorrent.count(username) ) { if( !m_userTorrent.count(username) || peek_single_piece >= 0 ) {
sha1_hash ih = dhtTargetHash(username, "tracker", "m"); sha1_hash ih = dhtTargetHash(username, "tracker", "m");
printf("adding torrent for [%s,tracker]\n", username.c_str()); printf("adding torrent for [%s,tracker]\n", username.c_str());
@ -155,6 +156,7 @@ torrent_handle startTorrentUser(std::string const &username, bool following)
tparams.name = username; tparams.name = username;
boost::filesystem::path torrentPath = GetDataDir() / "swarm"; boost::filesystem::path torrentPath = GetDataDir() / "swarm";
tparams.save_path= torrentPath.string(); tparams.save_path= torrentPath.string();
tparams.peek_single_piece = peek_single_piece;
boost::system::error_code ec; boost::system::error_code ec;
boost::filesystem::create_directory(torrentPath, ec); boost::filesystem::create_directory(torrentPath, ec);
if (ec) { if (ec) {
@ -163,18 +165,23 @@ torrent_handle startTorrentUser(std::string const &username, bool following)
std::string filename = combine_path(tparams.save_path, to_hex(ih.to_string()) + ".resume"); std::string filename = combine_path(tparams.save_path, to_hex(ih.to_string()) + ".resume");
load_file(filename.c_str(), tparams.resume_data); load_file(filename.c_str(), tparams.resume_data);
m_userTorrent[username] = ses->add_torrent(tparams); h = ses->add_torrent(tparams);
if( !following ) { if( peek_single_piece == -1 ) {
m_userTorrent[username].auto_managed(true); m_userTorrent[username] = h;
} }
m_userTorrent[username].force_dht_announce(); if( !following ) {
h.auto_managed(true);
}
h.force_dht_announce();
} else {
h = m_userTorrent[username];
} }
if( following ) { if( following ) {
m_userTorrent[username].set_following(true); h.set_following(true);
m_userTorrent[username].auto_managed(false); h.auto_managed(false);
m_userTorrent[username].resume(); h.resume();
} }
return m_userTorrent[username]; return h;
} }
torrent_handle getTorrentUser(std::string const &username) torrent_handle getTorrentUser(std::string const &username)
@ -1636,9 +1643,9 @@ bool createSignedUserpost(entry &v, std::string const &username, int k,
userpost["height"] = getBestHeight() - 1; // be conservative userpost["height"] = getBestHeight() - 1; // be conservative
if( msg.size() ) { if( msg.size() ) {
//userpost["t"] = "post";
userpost["msg"] = msg; userpost["msg"] = msg;
} }
switch(flag) switch(flag)
{ {
case USERPOST_FLAG_RT: case USERPOST_FLAG_RT:
@ -3615,7 +3622,7 @@ Value search(const Array& params, bool fHelp)
boost::algorithm::to_lower(keyword); boost::algorithm::to_lower(keyword);
string allowed = "abcdefghijklmnopqrstuvwxyz0123456789_"; string allowed = "abcdefghijklmnopqrstuvwxyz0123456789_";
for( int i = 0; i < allowed.size(); ++i ) { for( int i = 0; i < (int)allowed.size(); ++i ) {
set<string> usernames; set<string> usernames;
string prefix(1, allowed[i]); string prefix(1, allowed[i]);
pblocktree->GetNamesFromPartial(prefix, usernames, std::numeric_limits<int>::max()); pblocktree->GetNamesFromPartial(prefix, usernames, std::numeric_limits<int>::max());
@ -3679,7 +3686,7 @@ Value search(const Array& params, bool fHelp)
params.push_back(user); params.push_back(user);
params.push_back(INT_MAX); params.push_back(INT_MAX);
Array favs = getfavs(params, false).get_array(); Array favs = getfavs(params, false).get_array();
for (int i = 0; i < favs.size(); i++) for (int i = 0; i < (int)favs.size(); i++)
{ {
entry favp = jsonToEntry(favs[i]); entry favp = jsonToEntry(favs[i]);
entry *favu = favp.find_key("userpost"); entry *favu = favp.find_key("userpost");
@ -3706,7 +3713,7 @@ Value search(const Array& params, bool fHelp)
} }
std::multimap<int64_t,Value>::reverse_iterator rit; std::multimap<int64_t,Value>::reverse_iterator rit;
for (rit = postsByTime.rbegin(); rit != postsByTime.rend() && ret.size() < count; ++rit) for (rit = postsByTime.rbegin(); rit != postsByTime.rend() && (int)ret.size() < count; ++rit)
ret.push_back(rit->second); ret.push_back(rit->second);
} }
else { else {
@ -4007,8 +4014,6 @@ Value getpieceavailability(const Array& params, bool fHelp)
"getpieceavailability <username> <k>\n" "getpieceavailability <username> <k>\n"
"Get piece availability (peer count for this piece)"); "Get piece availability (peer count for this piece)");
EnsureWalletIsUnlocked();
string strUsername = params[0].get_str(); string strUsername = params[0].get_str();
int k = params[1].get_int(); int k = params[1].get_int();
@ -4016,7 +4021,7 @@ Value getpieceavailability(const Array& params, bool fHelp)
std::vector<int> avail; std::vector<int> avail;
h.piece_availability(avail); h.piece_availability(avail);
return avail.size() > k ? avail.at(k) : 0; return (int)avail.size() > k ? avail.at(k) : 0;
} }
Value getpiecemaxseen(const Array& params, bool fHelp) Value getpiecemaxseen(const Array& params, bool fHelp)
@ -4026,8 +4031,6 @@ Value getpiecemaxseen(const Array& params, bool fHelp)
"getpiecemaxseen <username> <k>\n" "getpiecemaxseen <username> <k>\n"
"Get piece max seen availability (max peer count for this piece)"); "Get piece max seen availability (max peer count for this piece)");
EnsureWalletIsUnlocked();
string strUsername = params[0].get_str(); string strUsername = params[0].get_str();
int k = params[1].get_int(); int k = params[1].get_int();
@ -4035,6 +4038,123 @@ Value getpiecemaxseen(const Array& params, bool fHelp)
std::vector<int> max_seen; std::vector<int> max_seen;
h.piece_max_seen(max_seen); h.piece_max_seen(max_seen);
return max_seen.size() > k ? max_seen.at(k) : 0; return (int)max_seen.size() > k ? max_seen.at(k) : 0;
}
Value peekpost(const Array& params, bool fHelp)
{
if (fHelp || params.size() < 2 || params.size() > 4 )
throw runtime_error(
"peekpost <username> <k> [field='*'] [timeout_sec=90]\n"
"Peek post from best/faster available source, either\n"
"local torrent, DHT or remote torrent (peek extension).\n"
"field is a convenience to return specific post field\n"
"instead of the whole post (eg. 'url')");
boost::shared_ptr<session> ses(m_ses);
if( !ses )
throw JSONRPCError(RPC_INTERNAL_ERROR, "uninitialized session");
string strUsername = params[0].get_str();
int k = std::max(params[1].get_int(), 0);
string strField = "*";
if( params.size() > 2 )
strField = params[2].get_str();
time_duration timeToWait = seconds(90);
if( params.size() > 3 )
timeToWait = seconds(params[3].get_int());
entry vEntry;
torrent_handle h = getTorrentUser(strUsername);
if( h.is_valid() ) {
std::vector<std::string> pieces;
int allowed_flags = 0xff;
int required_flags = 0;
h.get_pieces(pieces, 1, k, k-1, allowed_flags, required_flags);
lazy_entry v;
int pos;
libtorrent::error_code ec;
if(pieces.size() &&
lazy_bdecode(pieces[0].data(), pieces[0].data()+pieces[0].size(), v, ec, &pos) == 0 &&
v.type() == lazy_entry::dict_t) {
printf("peekpiece: got piece (%s,%d) from local torrent\n",strUsername.c_str(), k);
vEntry = v;
}
} else {
/* there is quite some code shared with dhtget, but it is intermigled with
* torrent's piece peek. so we accept a little copy-paste for now. */
alert_manager am(10, alert::dht_notification);
string strResource = "post" + boost::lexical_cast<std::string>(k);
bool multi = false;
sha1_hash ih = dhtTargetHash(strUsername,strResource,"s");
vector<CNode*> dhtProxyNodes;
if( !DhtProxy::fEnabled ) {
dhtgetMapAdd(ih, &am);
dhtGetData(strUsername, strResource, multi, true);
} else {
DhtProxy::dhtgetMapAdd(ih, &am);
dhtProxyNodes = DhtProxy::dhtgetStartRequest(strUsername, strResource, multi);
}
h = startTorrentUser(strUsername,true,k);
// this loop receives alerts from both dht network and torrent peek extension
while( am.wait_for_alert(timeToWait) ) {
std::auto_ptr<alert> a(am.get());
dht_reply_data_alert const* rd = alert_cast<dht_reply_data_alert>(&(*a));
if( rd && rd->m_lst.size() ) {
entry dhtEntry = rd->m_lst.front();
entry const *pEntry = dhtEntry.find_key("p");
if( pEntry && pEntry->type() == entry::dictionary_t ) {
entry const *v = pEntry->find_key("v");
if( v && v->type() == entry::dictionary_t ) {
vEntry = *v;
}
}
string source = "dht";
entry const *pSigEntry = dhtEntry.find_key("sig_p");
if(pSigEntry && pSigEntry->type() == entry::string_t &&
pSigEntry->string() == "peek" ) {
source = "peek";
}
printf("peekpiece: got piece (%s,%d) from %s\n",strUsername.c_str(), k, source.c_str());
break;
}
}
if( h.is_valid() )
ses->remove_torrent(h);
if( !DhtProxy::fEnabled ) {
dhtgetMapRemove(ih,&am);
} else {
DhtProxy::dhtgetMapRemove(ih,&am);
DhtProxy::dhtgetStopRequest(dhtProxyNodes, strUsername, strResource, multi);
}
}
Value ret;
if( vEntry.type() == entry::dictionary_t ) {
hexcapePost(vEntry);
if( strField == "*" ) {
ret = entryToJson(vEntry);
} else {
entry const *userpost = vEntry.find_key("userpost");
if( userpost && userpost->type() == entry::dictionary_t ) {
entry const *f = userpost->find_key(strField);
if( f && f->type() == entry::string_t ) {
ret = f->string();
}
}
}
} else {
throw JSONRPCError(RPC_TIMEOUT, "timeout or post not found");
}
return ret;
} }