Merge branch 'master' of /home/miguel/softs/twister

This commit is contained in:
Miguel Freitas 2013-11-02 12:35:58 -02:00
commit 7e1468299b
21 changed files with 256 additions and 120 deletions

View File

@ -14,6 +14,7 @@ libtorrent's.
1) Start with libtorrent:
$ cd libtorrent
$ ./bootstrap.sh
$ ./configure --enable-logging --enable-debug --enable-dht
Note1: 64-bit systems may need "--with-boost-libdir=/usr/lib64"

5
TODO
View File

@ -66,3 +66,8 @@ of his own torrent)
- Estimate number of online followers by quering the "tracker" resource (implement a value within
this resource to report the number of torrent peers)
- Define and enforce html directory to serve from. Do installation scripts.
- Don't accept dht "post"+k if k violates the validatePostNumberForUser() rule.

View File

@ -523,6 +523,8 @@ namespace libtorrent
return m_picker->have_piece(index);
}
void recheck_pieces(uint32_t piece_flags);
// called when we learn that we have a piece
// only once per piece
void we_have(int index, boost::uint32_t post_flags);

View File

@ -170,6 +170,7 @@ namespace libtorrent
void read_piece(int piece) const;
void get_pieces(std::vector<std::string> &pieces, int count, int max_id, int since_id, uint32_t filter_flags) const;
bool have_piece(int piece) const;
void recheck_pieces(uint32_t piece_flags) const;
void get_full_peer_list(std::vector<peer_list_entry>& v) const;
void get_peer_info(std::vector<peer_info>& v) const;

View File

@ -206,10 +206,6 @@ bool find_data::invoke(observer_ptr o)
return false;
}
// im not going to ask trackers from myself
if( o->id() == m_node.nid() )
return false;
entry e;
e["z"] = "q";
e["q"] = "getData"; // "getPeers"

View File

@ -1179,13 +1179,11 @@ void node_impl::incoming_request(msg const& m, entry& e)
sha1_hash target = hasher(targetbuf.first,targetbuf.second).final();
//#ifdef TORRENT_DHT_VERBOSE_LOGGING
std::string target_str(targetbuf.first,targetbuf.second);
printf("PUT target: %s = {%s,%s,%s} = '%s'\n"
, to_hex(target.to_string()).c_str()
printf("PUT target={%s,%s,%s} from=%s:%d\n"
, msg_keys[mk_n]->string_value().c_str()
, msg_keys[mk_r]->string_value().c_str()
, msg_keys[mk_t]->string_value().c_str()
, target_str.c_str());
, m.addr.address().to_string().c_str(), m.addr.port());
//#endif
// verify the write-token. tokens are only valid to write to
@ -1359,13 +1357,11 @@ void node_impl::incoming_request(msg const& m, entry& e)
if (msg_keys[mk_justtoken] && msg_keys[mk_justtoken]->int_value() != 0) justtoken = true;
//#ifdef TORRENT_DHT_VERBOSE_LOGGING
std::string target_str(targetbuf.first,targetbuf.second);
printf("GET target: %s = {%s,%s,%s} = '%s'\n"
, to_hex(target.to_string()).c_str()
printf("GET target={%s,%s,%s} from=%s:%d\n"
, msg_keys[mk_n]->string_value().c_str()
, msg_keys[mk_r]->string_value().c_str()
, msg_keys[mk_t]->string_value().c_str()
, target_str.c_str());
, m.addr.address().to_string().c_str(), m.addr.port());
//#endif
reply["token"] = generate_token(m.addr, target.to_string().c_str());

View File

@ -1235,7 +1235,7 @@ namespace libtorrent
, coalesce_writes(false)
, outgoing_ports(0,0)
, peer_tos(0)
, active_downloads(3)
, active_downloads(6)
, active_seeds(5)
, active_dht_limit(88) // don't announce more than once every 40 seconds
, active_tracker_limit(1600) // don't announce to trackers more than once every 1.125 seconds

View File

@ -2062,15 +2062,16 @@ namespace libtorrent
TORRENT_ASSERT(m_allow_peers);
// [MF] use m_dht->announce with myself=false to update dht tracker with other peers
// [MF] use m_dht->announce with myself=false to update dht tracker with peers we know
{
policy::const_iterator i = get_policy().begin_peer();
policy::const_iterator end = get_policy().end_peer();
for (; i != end; ++i)
{
for (; i != end; ++i) {
policy::peer const* p = *i;
if( p->connectable && !p->banned ) {
bool connect_recently = !p->banned && int(p->failcount) < settings().max_failcount &&
p->last_connected && (m_ses.session_time() - p->last_connected) < (4*3600);
if( p->connectable && ( p->connection || connect_recently) ) {
m_ses.m_dht->announce(name(), m_torrent_file->info_hash()
, p->address(), p->port, p->seed, false
, boost::bind(&nop));
@ -3072,6 +3073,30 @@ namespace libtorrent
m_picker->set_piece_priority(i, 6);
}
void on_disk_read_recheck_piece_complete(int ret, disk_io_job const& j, peer_request r)
{
// [MF] FIXME: implement cond_wakeup here so that recheck_pieces would wait
}
void torrent::recheck_pieces(uint32_t piece_flags)
{
TORRENT_ASSERT(m_ses.is_network_thread());
TORRENT_ASSERT(m_picker);
for( int i = 0; i <= last_have(); i++) {
if( m_picker->have_piece(i) && m_picker->post_flags(i) == piece_flags ) {
peer_request r;
r.piece = i;
r.start = 0;
r.length = torrent_file().piece_size(i);
filesystem().async_read_and_hash(r,
boost::bind(&on_disk_read_recheck_piece_complete
, _1, _2, r), 1);
}
}
}
void torrent::we_have(int index, boost::uint32_t post_flags)
{
TORRENT_ASSERT(m_ses.is_network_thread());

View File

@ -846,6 +846,12 @@ namespace libtorrent
return r;
}
void torrent_handle::recheck_pieces(uint32_t piece_flags) const
{
INVARIANT_CHECK;
TORRENT_SYNC_CALL1(recheck_pieces, piece_flags);
}
storage_interface* torrent_handle::get_storage_impl() const
{
INVARIANT_CHECK;

View File

@ -124,64 +124,6 @@ bool AppInit(int argc, char* argv[])
}
static const string strSecret1C ("Kwr371tjA9u2rFSMZjTNun2PXXP3WPZu2afRHTcta6KxEUdm1vEw");
static bool TestCreateSpamMsgTx()
{
CTransaction txNew;
txNew.message = CScript() << strSpamMessage;
CBitcoinSecret bsecret1;
bsecret1.SetString (strSecret1C);
CKey key;
key.MakeNewKey(true);
//CKey key = bsecret1.GetKey();
CPubKey pubkey( key.GetPubKey() );
printf("key valid: %d compressed: %d\n", key.IsValid(), key.IsCompressed());
printf("pubkey: %s\n", EncodeBase64(&pubkey[0], pubkey.size()).c_str() );
// compute message hash and sign it
CHashWriter ss(SER_GETHASH, PROTOCOL_VERSION);
ss << txNew.message;
uint256 hashMsg = ss.GetHash();
// vchSig is sig(hash(message))
vector<unsigned char> vchSig;
if (!key.Sign(hashMsg, vchSig)) {
printf("CreateNewBlock: Failed to sign SpamMessage\n");
return false;
}
/*
vector<unsigned char> vchSigCompact;
if (!key.SignCompact(hashMsg, vchSigCompact)) {
printf("CreateNewBlock: Failed to sign SpamMessage2\n");
return false;
}
printf("sign size: %zu signCompact size: %zu\n", vchSig.size(), vchSigCompact.size());
printf("signCompact: %s\n", EncodeBase64(&vchSigCompact[0], vchSigCompact.size()).c_str());
*/
printf("CreateSpamMsgTx: msg = %s user = %s hash = %s signedhash = %s\n", txNew.message.ToString().c_str(), strSpamUser.c_str(),
hashMsg.ToString().c_str(), EncodeBase64(&vchSig[0], vchSig.size()).c_str() );
// add username and signature
txNew.userName = CScript() << strSpamUser;
txNew.userName += CScript() << vchSig;
txNew.pubKey.clear(); // pubKey will be updated to include extranonce
txNew.nNonce = 0; // no update needed for spamMessage's nonce.
std::vector< std::vector<unsigned char> > vData;
txNew.userName.ExtractPushData(vData);
printf("Verify: %d\n", pubkey.Verify(hashMsg,vData[1]));
exit(1);
}
extern void GenesisMiner();
extern void noui_connect();
int main(int argc, char* argv[])
@ -190,7 +132,6 @@ int main(int argc, char* argv[])
fHaveGUI = false;
//GenesisMiner();
//TestCreateSpamMsgTx();
// Connect bitcoind signal handlers
noui_connect();

View File

@ -230,6 +230,7 @@ static const CRPCCommand vRPCCommands[] =
{ "submitblock", &submitblock, false, false },
{ "listsinceblock", &listsinceblock, false, false },
{ "dumpprivkey", &dumpprivkey, true, false },
{ "dumppubkey", &dumppubkey, false, false },
{ "dumpwallet", &dumpwallet, true, false },
{ "importprivkey", &importprivkey, false, false },
{ "importwallet", &importwallet, false, false },
@ -254,8 +255,7 @@ static const CRPCCommand vRPCCommands[] =
{ "getfollowing", &getfollowing, false, true },
{ "getlasthave", &getlasthave, false, true },
{ "listusernamespartial", &listusernamespartial, false, true },
{ "getdefaultuser", &getdefaultuser, false, true },
{ "setdefaultuser", &setdefaultuser, false, true },
{ "rescandirectmsgs", &rescandirectmsgs, false, true },
};
CRPCTable::CRPCTable()
@ -917,7 +917,8 @@ void JSONRequest::parse(const Value& valRequest)
throw JSONRPCError(RPC_INVALID_REQUEST, "Method must be a string");
strMethod = valMethod.get_str();
if (strMethod != "getwork" && strMethod != "getblocktemplate" &&
strMethod != "getlasthave")
strMethod != "getlasthave" &&
strMethod != "getinfo" && strMethod != "getbestblockhash" && strMethod != "getblock")
printf("ThreadRPCServer method=%s\n", strMethod.c_str());
// Parse params
@ -1244,6 +1245,7 @@ Array RPCConvertValues(const std::string &strMethod, const std::vector<std::stri
if (strMethod == "lockunspent" && n > 0) ConvertTo<bool>(params[0]);
if (strMethod == "lockunspent" && n > 1) ConvertTo<Array>(params[1]);
if (strMethod == "importprivkey" && n > 2) ConvertTo<bool>(params[2]);
if (strMethod == "importprivkey" && n > 3) ConvertTo<bool>(params[3]);
if (strMethod == "verifychain" && n > 0) ConvertTo<boost::int64_t>(params[0]);
if (strMethod == "verifychain" && n > 1) ConvertTo<boost::int64_t>(params[1]);
if (strMethod == "dhtput" && n > 3) ConvertToValue(params[3]);
@ -1261,6 +1263,7 @@ Array RPCConvertValues(const std::string &strMethod, const std::vector<std::stri
if (strMethod == "follow" && n > 1) ConvertTo<Array>(params[1]);
if (strMethod == "unfollow" && n > 1) ConvertTo<Array>(params[1]);
if (strMethod == "listusernamespartial" && n > 1) ConvertTo<boost::int64_t>(params[1]);
if (strMethod == "listusernamespartial" && n > 2) ConvertTo<bool>(params[2]);
return params;
}

View File

@ -146,6 +146,7 @@ extern json_spirit::Value addnode(const json_spirit::Array& params, bool fHelp);
extern json_spirit::Value getaddednodeinfo(const json_spirit::Array& params, bool fHelp);
extern json_spirit::Value dumpprivkey(const json_spirit::Array& params, bool fHelp); // in rpcdump.cpp
extern json_spirit::Value dumppubkey(const json_spirit::Array& params, bool fHelp); // in rpcdump.cpp
extern json_spirit::Value importprivkey(const json_spirit::Array& params, bool fHelp);
extern json_spirit::Value dumpwallet(const json_spirit::Array& params, bool fHelp);
extern json_spirit::Value importwallet(const json_spirit::Array& params, bool fHelp);
@ -206,7 +207,6 @@ extern json_spirit::Value unfollow(const json_spirit::Array& params, bool fHelp)
extern json_spirit::Value getfollowing(const json_spirit::Array& params, bool fHelp);
extern json_spirit::Value getlasthave(const json_spirit::Array& params, bool fHelp);
extern json_spirit::Value listusernamespartial(const json_spirit::Array& params, bool fHelp);
extern json_spirit::Value getdefaultuser(const json_spirit::Array& params, bool fHelp);
extern json_spirit::Value setdefaultuser(const json_spirit::Array& params, bool fHelp);
extern json_spirit::Value rescandirectmsgs(const json_spirit::Array& params, bool fHelp);
#endif

View File

@ -33,7 +33,7 @@ public:
nDefaultPort = 28333;
nRPCPort = 28332;
bnProofOfWorkLimit = CBigNum(~uint256(0) >> 1);
nTxBits = 0x1e03ffff;
nTxBits = 0x1e00ffff;
nSubsidyHalvingInterval = 210000;
// Build the genesis block. Note that the output of the genesis coinbase cannot

View File

@ -177,7 +177,7 @@ std::string HelpMessage()
strUsage += " -socks=<n> " + _("Select the version of socks proxy to use (4-5, default: 5)") + "\n";
strUsage += " -tor=<ip:port> " + _("Use proxy to reach tor hidden services (default: same as -proxy)") + "\n";
strUsage += " -dns " + _("Allow DNS lookups for -addnode, -seednode and -connect") + "\n";
strUsage += " -port=<port> " + _("Listen for connections on <port> (default: 8333 or testnet: 18333)") + "\n";
strUsage += " -port=<port> " + _("Listen for connections on <port> (default: 28333 or testnet: 18333)") + "\n";
strUsage += " -maxconnections=<n> " + _("Maintain at most <n> connections to peers (default: 125)") + "\n";
strUsage += " -addnode=<ip> " + _("Add a node to connect to and attempt to keep the connection open") + "\n";
strUsage += " -connect=<ip> " + _("Connect only to the specified node(s)") + "\n";
@ -220,7 +220,7 @@ std::string HelpMessage()
#endif
strUsage += " -rpcuser=<user> " + _("Username for JSON-RPC connections") + "\n";
strUsage += " -rpcpassword=<pw> " + _("Password for JSON-RPC connections") + "\n";
strUsage += " -rpcport=<port> " + _("Listen for JSON-RPC connections on <port> (default: 8332 or testnet: 18332)") + "\n";
strUsage += " -rpcport=<port> " + _("Listen for JSON-RPC connections on <port> (default: 28332 or testnet: 18332)") + "\n";
strUsage += " -rpcallowip=<ip> " + _("Allow JSON-RPC connections from specified IP address") + "\n";
if (!fHaveGUI)
strUsage += " -rpcconnect=<ip> " + _("Send commands to node running on <ip> (default: 127.0.0.1)") + "\n";

View File

@ -425,10 +425,45 @@ bool CheckTransaction(const CTransaction& tx, CValidationState &state)
{
// Basic checks that don't depend on any context
if (tx.IsSpamMessage()) {
if (tx.message.size() > MAX_SPAM_MSG_SIZE)
string spamMsg = tx.message.ExtractPushDataString(0);
if (!spamMsg.size())
return state.DoS(100, error("CheckTransaction() : invalid or empty spam message"));
if (spamMsg.size() > MAX_SPAM_MSG_SIZE)
return state.DoS(100, error("CheckTransaction() : spam message too big"));
// [MF] TODO: check message signature
// [MF] Problem: registration of this user may be in the block itself, better check later.
string spamUser = tx.userName.ExtractPushDataString(0);
if( spamUser != "nobody" ) {
string strSign = tx.userName.ExtractPushDataString(1);
if (!strSign.size())
return state.DoS(100, error("CheckTransaction() : spam signature missing"));
vector<unsigned char> vchSig((const unsigned char*)strSign.data(),
(const unsigned char*)strSign.data() + strSign.size());
CPubKey pubkey;
CTransaction txPubKey;
uint256 hashBlock;
uint256 userhash = SerializeHash(spamUser);
if( !GetTransaction(userhash, txPubKey, hashBlock) )
return state.DoS(100, error("CheckTransaction() : spam signed by unknown user"));
std::vector< std::vector<unsigned char> > vData;
if( !txPubKey.pubKey.ExtractPushData(vData) || vData.size() < 1 )
return state.DoS(100, error("CheckTransaction() : spam signed with broken pubkey"));
pubkey = CPubKey(vData[0]);
// compute message hash for signature checking
CHashWriter ss(SER_GETHASH, PROTOCOL_VERSION);
ss << strMessageMagic;
ss << tx.message;
CPubKey pubkeyRec;
if (!pubkeyRec.RecoverCompact(ss.GetHash(), vchSig))
return state.DoS(100, error("CheckTransaction() : RecoverCompact failed for spammsg"));
if (pubkeyRec.GetID() != pubkey.GetID())
return state.DoS(100, error("CheckTransaction() : spam signature verification failed"));
}
} else {
if (tx.userName.empty())
return state.DoS(10, error("CheckTransaction() : username empty"));
@ -1713,12 +1748,11 @@ bool ProcessBlock(CValidationState &state, CNode* pfrom, CBlock* pblock, CDiskBl
if( pblock->vtx[0].IsSpamMessage() ) {
string msg = pblock->vtx[0].message.ExtractPushDataString(0);
string user = pblock->vtx[0].userName.ExtractPushDataString(0);
// [MF] FIXME: validate user properly
if( msg.length() <= 140 ) {
// SpamMessage was already validated in CheckBlock => CheckTransation
printf("ProcessBlock: msg='%s' user='%s'\n", msg.c_str(), user.c_str());
receivedSpamMessage(msg, user);
}
}
return true;
}
@ -3492,10 +3526,12 @@ static bool CreateSpamMsgTx(CTransaction &txNew, std::vector<unsigned char> &sal
std::string strUsername = strSpamUser;
CKeyID keyID;
if( !pwalletMain->GetKeyIdFromUsername(strSpamUser, keyID) ) {
if( strSpamUser != "nobody" && !pwalletMain->GetKeyIdFromUsername(strSpamUser, keyID) ) {
if( pwalletMain->vchDefaultKey.IsValid() ) {
keyID = pwalletMain->vchDefaultKey.GetID();
strUsername = pwalletMain->mapKeyMetadata[keyID].username;
} else {
strUsername = "nobody";
}
}
printf("CreateSpamMsgTx: keyId = %s\n", keyID.ToString().c_str() );
@ -3534,6 +3570,10 @@ static bool CreateSpamMsgTx(CTransaction &txNew, std::vector<unsigned char> &sal
txNew.pubKey.clear(); // pubKey will be updated to include extranonce
txNew.nNonce = 0; // no update needed for spamMessage's nonce.
CValidationState state;
bool ret = CheckTransaction(txNew, state);
printf("CheckTransaction returned %d\n", ret );
return true;
}

View File

@ -382,7 +382,7 @@ bool static ConnectSocketDirectly(const CService &addrConnect, SOCKET& hSocketRe
}
if (nRet != 0)
{
printf("connect() failed after select(): %s\n",strerror(nRet));
//printf("connect() failed after select(): %s\n",strerror(nRet));
closesocket(hSocket);
return false;
}

View File

@ -9,6 +9,7 @@
#include "bitcoinrpc.h"
#include "ui_interface.h"
#include "base58.h"
#include "twister.h"
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/lexical_cast.hpp>
@ -67,9 +68,9 @@ std::string DecodeDumpString(const std::string &str) {
Value importprivkey(const Array& params, bool fHelp)
{
if (fHelp || params.size() < 2 || params.size() > 3)
if (fHelp || params.size() < 2 || params.size() > 4)
throw runtime_error(
"importprivkey <bitcoinprivkey> <username> [rescan=true]\n"
"importprivkey <bitcoinprivkey> <username> [rescan=true] [allow_new_user=false]\n"
"Adds a private key (as returned by dumpprivkey) to your wallet.");
string strSecret = params[0].get_str();
@ -80,11 +81,24 @@ Value importprivkey(const Array& params, bool fHelp)
if (params.size() > 2)
fRescan = params[2].get_bool();
bool fAllowNewUser = false;
if (params.size() > 3)
fAllowNewUser = params[3].get_bool();
CBitcoinSecret vchSecret;
bool fGood = vchSecret.SetString(strSecret);
if (!fGood) throw JSONRPCError(RPC_INVALID_ADDRESS_OR_KEY, "Invalid private key");
if( !fAllowNewUser ) {
CTransaction txOut;
uint256 hashBlock;
uint256 userhash = SerializeHash(strUsername);
if( !GetTransaction(userhash, txOut, hashBlock) ) {
throw JSONRPCError(RPC_INVALID_ADDRESS_OR_KEY, "User must exist (or allow_new_user flag must be set)");
}
}
CKey key = vchSecret.GetKey();
CPubKey pubkey = key.GetPubKey();
CKeyID vchAddress = pubkey.GetID();
@ -211,6 +225,26 @@ Value dumpprivkey(const Array& params, bool fHelp)
return CBitcoinSecret(vchSecret).ToString();
}
Value dumppubkey(const Array& params, bool fHelp)
{
if (fHelp || params.size() != 1)
throw runtime_error(
"dumppubkey <username>\n"
"Returns the public key corresponding to <username> (empty if user doesn't exist)");
string strUsername = params[0].get_str();
CPubKey pubkey;
bool gotKey = getUserPubKey(strUsername, pubkey);
if( !gotKey ) {
return "";
}
string strPubkey = string( reinterpret_cast<const char *>(pubkey.begin()), pubkey.size());
return strPubkey;
}
Value dumpwallet(const Array& params, bool fHelp)
{

View File

@ -153,6 +153,10 @@ Value listwalletusers(const Array& params, bool fHelp)
return ret;
}
/* [mf] no use for setting/getting defaultuser, it just adds confusion.
all commands should receive user as parameter (including the user for spammsg).
*/
#if 0
Value setdefaultuser(const Array& params, bool fHelp)
{
if (fHelp || params.size() != 1)
@ -199,7 +203,7 @@ Value getdefaultuser(const Array& params, bool fHelp)
return username;
}
#endif
Value signmessage(const Array& params, bool fHelp)
{

View File

@ -69,8 +69,10 @@ sha1_hash dhtTargetHash(std::string const &username, std::string const &resource
torrent_handle startTorrentUser(std::string const &username)
{
bool userInTxDb = usernameExists(username); // keep this outside cs_twister to avoid deadlock
LOCK(cs_twister);
if( !m_userTorrent.count(username) ) {
if( !m_userTorrent.count(username) && userInTxDb ) {
sha1_hash ih = dhtTargetHash(username, "tracker", "m");
printf("adding torrent for [%s,tracker]\n", username.c_str());
@ -111,6 +113,12 @@ int saveGlobalData(std::string const& filename)
globalDict["receivedSpamMsg"] = m_receivedSpamMsgStr;
globalDict["receivedSpamUser"] = m_receivedSpamUserStr;
globalDict["lastSpamTime"] = m_lastSpamTime;
globalDict["sendSpamMsg"] = strSpamMessage;
globalDict["sendSpamUser"] = strSpamUser;
globalDict["generate"] = GetBoolArg("-gen", false);
int genproclimit = GetArg("-genproclimit", -1);
if( genproclimit > 0 )
globalDict["genproclimit"] = genproclimit;
std::vector<char> buf;
bencode(std::back_inserter(buf), globalDict);
@ -131,6 +139,20 @@ int loadGlobalData(std::string const& filename)
m_receivedSpamMsgStr = userDict.dict_find_string_value("receivedSpamMsg");
m_receivedSpamUserStr = userDict.dict_find_string_value("receivedSpamUser");
m_lastSpamTime = userDict.dict_find_int_value("lastSpamTime");
string sendSpamMsg = userDict.dict_find_string_value("sendSpamMsg");
if( sendSpamMsg.size() ) strSpamMessage = sendSpamMsg;
string sendSpamUser = userDict.dict_find_string_value("sendSpamUser");
if( sendSpamUser.size() ) strSpamUser = sendSpamUser;
bool generate = userDict.dict_find_int_value("generate");
int genproclimit = userDict.dict_find_int_value("genproclimit");
if( generate ) {
Array params;
params.push_back( generate );
if( genproclimit > 0 )
params.push_back( genproclimit );
setgenerate(params, false);
}
return 0;
}
@ -249,14 +271,16 @@ void ThreadMaintainDHTNodes()
while(1) {
MilliSleep(5000);
session_status ss = ses->status();
int dht_nodes = ss.dht_nodes;
bool nodesAdded = false;
if( ses ) {
LOCK(cs_vNodes);
vector<CAddress> vAddr = addrman.GetAddr();
session_status ss = ses->status();
int totalNodesCandidates = (int)(vNodes.size() + vAddr.size());
if( (!ss.dht_nodes && totalNodesCandidates) ||
ss.dht_nodes < totalNodesCandidates / 2 ) {
if( (!dht_nodes && totalNodesCandidates) ||
(dht_nodes < 5 && totalNodesCandidates > 10) ) {
printf("ThreadMaintainDHTNodes: too few dht_nodes, trying to add some...\n");
BOOST_FOREACH(const CAddress &a, vAddr) {
std::string addr = a.ToStringIP();
@ -269,7 +293,7 @@ void ThreadMaintainDHTNodes()
// if !fInbound we created this connection so ip is reachable.
// we can't use port number of inbound connection, so try standard port.
// only use inbound as last resort (if dht_nodes empty)
if( !pnode->fInbound || !ss.dht_nodes ) {
if( !pnode->fInbound || !dht_nodes ) {
std::string addr = pnode->addr.ToStringIP();
int port = (!pnode->fInbound) ? pnode->addr.GetPort() : Params().GetDefaultPort();
port += LIBTORRENT_PORT_OFFSET;
@ -283,10 +307,19 @@ void ThreadMaintainDHTNodes()
}
}
if( nodesAdded ) {
MilliSleep(5000);
ss = ses->status();
if( ss.dht_nodes > dht_nodes ) {
// new nodes were added to dht: force updating peers from dht so torrents may start faster
LOCK(cs_twister);
BOOST_FOREACH(const PAIRTYPE(std::string, torrent_handle)& item, m_userTorrent) {
item.second.force_dht_announce();
}
} else {
// nodes added but dht ignored them, so they are probably duplicated.
// we sleep a bit as a punishment :-)
MilliSleep(30000);
}
}
}
}
@ -489,7 +522,6 @@ void stopSessionTorrent()
// save_resume_data will generate an alert when it's done
st.handle.save_resume_data();
++num_outstanding_resume_data;
printf("\r%d ", num_outstanding_resume_data);
}
printf("\nwaiting for resume data [%d]\n", num_outstanding_resume_data);
while (num_outstanding_resume_data > 0)
@ -658,12 +690,16 @@ bool processReceivedDM(lazy_entry const* post)
LOCK(cs_twister);
// store this dm in memory list, but prevent duplicates
std::vector<StoredDirectMsg> &dmsFromToUser = m_users[item.second.username].m_directmsg[n];
std::vector<StoredDirectMsg>::const_iterator it;
std::vector<StoredDirectMsg>::iterator it;
for( it = dmsFromToUser.begin(); it != dmsFromToUser.end(); ++it ) {
if( stoDM.m_utcTime == (*it).m_utcTime &&
stoDM.m_text == (*it).m_text ) {
break;
}
if( stoDM.m_utcTime < (*it).m_utcTime ) {
dmsFromToUser.insert(it, stoDM);
break;
}
}
if( it == dmsFromToUser.end() ) {
dmsFromToUser.push_back(stoDM);
@ -947,7 +983,8 @@ bool shouldDhtResourceExpire(std::string resource, bool multi, int height)
if( m_noExpireResources[resourceBasic] == PostNoExpireRecent &&
(height + BLOCK_AGE_TO_EXPIRE_DHT_POSTS) < getBestHeight() ) {
#ifdef DEBUG_EXPIRE_DHT_ITEM
printf("shouldDhtResourceExpire: expiring old post resource '%s'\n", resource.c_str());
printf("shouldDhtResourceExpire: expiring old post resource '%s' (height %d cur %d)\n",
resource.c_str(), height, getBestHeight());
#endif
return true;
}
@ -1262,6 +1299,8 @@ Value newrtmsg(const Array& params, bool fHelp)
// post to dht as well
ses->dht_putData(strUsername, string("post")+strK, false,
v, strUsername, GetAdjustedTime(), 1);
ses->dht_putData(strUsername, string("status"), false,
v, strUsername, GetAdjustedTime(), k);
// notification to keep track of RTs of the original post
if( rt ) {
@ -1532,13 +1571,16 @@ Value getlasthave(const Array& params, bool fHelp)
Value listusernamespartial(const Array& params, bool fHelp)
{
if (fHelp || (params.size() != 2))
if (fHelp || (params.size() < 2 || params.size() > 3))
throw runtime_error(
"listusernamespartial <username_starts_with> <count>\n"
"listusernamespartial <username_starts_with> <count> [exact_match=false]\n"
"get list of usernames starting with");
string userStartsWith = params[0].get_str();
size_t count = params[1].get_int();
bool exact_match = false;
if( params.size() > 2 )
exact_match = params[2].get_bool();
set<string> retStrings;
@ -1548,7 +1590,11 @@ Value listusernamespartial(const Array& params, bool fHelp)
BOOST_FOREACH(const PAIRTYPE(CKeyID, CKeyMetadata)& item, pwalletMain->mapKeyMetadata) {
LOCK(cs_twister);
BOOST_FOREACH(const string &user, m_users[item.second.username].m_following) {
int toCompare = std::min( userStartsWith.size(), user.size() );
if( (exact_match && userStartsWith.size() != user.size()) ||
userStartsWith.size() > user.size() ) {
continue;
}
int toCompare = userStartsWith.size();
if( memcmp( user.data(), userStartsWith.data(), toCompare ) == 0 )
retStrings.insert( user );
if( retStrings.size() >= count )
@ -1566,7 +1612,11 @@ Value listusernamespartial(const Array& params, bool fHelp)
BOOST_FOREACH(const CTransaction&tx, block.vtx) {
if( !tx.IsSpamMessage() ) {
string txUsername = tx.userName.ExtractSmallString();
int toCompare = std::min( userStartsWith.size(), txUsername.size() );
if( (exact_match && userStartsWith.size() != txUsername.size()) ||
userStartsWith.size() > txUsername.size() ) {
continue;
}
int toCompare = userStartsWith.size();
if( memcmp( txUsername.data(), userStartsWith.data(), toCompare ) == 0 )
retStrings.insert( txUsername );
if( retStrings.size() >= count )
@ -1583,3 +1633,33 @@ Value listusernamespartial(const Array& params, bool fHelp)
return ret;
}
Value rescandirectmsgs(const Array& params, bool fHelp)
{
if (fHelp || (params.size() != 1))
throw runtime_error(
"rescandirectmsgs <username>\n"
"rescan all streams of users we follow for new and old directmessages");
string localUser = params[0].get_str();
std::set<std::string> following;
{
LOCK(cs_twister);
following = m_users[localUser].m_following;
}
BOOST_FOREACH(string username, following) {
torrent_handle torrent;
{
LOCK(cs_twister);
if( username.size() && m_userTorrent.count(username) )
torrent = m_userTorrent[username];
}
if( torrent.is_valid() ){
torrent.recheck_pieces(USERPOST_FLAG_DM);
}
}
return Value();
}

View File

@ -11,7 +11,7 @@
#define USERPOST_FLAG_DM 0x02
#define BLOCK_AGE_TO_EXPIRE_DHT_ENTRY (2016) // about 2 weeks
#define BLOCK_AGE_TO_EXPIRE_DHT_POSTS (4320*6) // about 6 months
#define BLOCK_AGE_TO_EXPIRE_DHT_POSTS (4320*2) // about 2 months
class twister
@ -23,6 +23,7 @@ public:
void startSessionTorrent(boost::thread_group& threadGroup);
void stopSessionTorrent();
bool getUserPubKey(std::string const &strUsername, CPubKey &pubkey);
std::string createSignature(std::string const &strMessage, CKeyID &keyID);
std::string createSignature(std::string const &strMessage, std::string const &strUsername);
bool verifySignature(std::string const &strMessage, std::string const &strUsername, std::string const &strSign);

View File

@ -137,8 +137,8 @@ int saveUserData(std::string const& filename, std::map<std::string,UserData> con
for (i = users.begin(); i != users.end(); ++i) {
UserData const &udata = i->second;
entry &userData = userDict[i->first];
if( udata.m_following.size() ) {
entry &userData = userDict[i->first];
entry &followingList = userData["following"];
BOOST_FOREACH( std::string const &n, udata.m_following) {
followingList.list().push_back(n);
@ -146,6 +146,7 @@ int saveUserData(std::string const& filename, std::map<std::string,UserData> con
}
if( udata.m_directmsg.size() ) {
entry &userData = userDict[i->first];
entry &dmDict = userData["dm"];
std::map<std::string, std::vector<StoredDirectMsg> >::const_iterator j;