create structure per local user to keep m_following and directmsgs

add proper mutexes to twister.cpp
This commit is contained in:
Miguel Freitas 2013-10-08 15:39:56 -03:00
parent 91ba50b836
commit f4939a7874
4 changed files with 142 additions and 71 deletions

View File

@ -1238,8 +1238,8 @@ Array RPCConvertValues(const std::string &strMethod, const std::vector<std::stri
if (strMethod == "newrtmsg" && n > 2) ConvertTo<Object>(params[2]); if (strMethod == "newrtmsg" && n > 2) ConvertTo<Object>(params[2]);
if (strMethod == "getposts" && n > 0) ConvertTo<boost::int64_t>(params[0]); if (strMethod == "getposts" && n > 0) ConvertTo<boost::int64_t>(params[0]);
if (strMethod == "getposts" && n > 1) ConvertTo<Array>(params[1]); if (strMethod == "getposts" && n > 1) ConvertTo<Array>(params[1]);
if (strMethod == "follow" && n > 0) ConvertTo<Array>(params[0]); if (strMethod == "follow" && n > 1) ConvertTo<Array>(params[1]);
if (strMethod == "unfollow" && n > 0) ConvertTo<Array>(params[0]); if (strMethod == "unfollow" && n > 1) ConvertTo<Array>(params[1]);
if (strMethod == "listusernamespartial" && n > 1) ConvertTo<boost::int64_t>(params[1]); if (strMethod == "listusernamespartial" && n > 1) ConvertTo<boost::int64_t>(params[1]);
return params; return params;

View File

@ -143,6 +143,7 @@ Value listwalletusers(const Array& params, bool fHelp)
// Find all addresses that have the given account // Find all addresses that have the given account
Array ret; Array ret;
LOCK(pwalletMain->cs_wallet);
BOOST_FOREACH(const PAIRTYPE(CKeyID, CKeyMetadata)& item, pwalletMain->mapKeyMetadata) BOOST_FOREACH(const PAIRTYPE(CKeyID, CKeyMetadata)& item, pwalletMain->mapKeyMetadata)
{ {
ret.push_back(item.second.username); ret.push_back(item.second.username);

View File

@ -38,12 +38,30 @@ static int num_outstanding_resume_data;
static CCriticalSection cs_dhtgetMap; static CCriticalSection cs_dhtgetMap;
static map<sha1_hash, alert_manager*> m_dhtgetMap; static map<sha1_hash, alert_manager*> m_dhtgetMap;
static CCriticalSection cs_twister;
static map<std::string, bool> m_specialResources; static map<std::string, bool> m_specialResources;
static map<std::string, torrent_handle> m_userTorrent; static map<std::string, torrent_handle> m_userTorrent;
static std::set<std::string> m_following;
static std::string preferredSpamLang = "[en]"; static std::string m_preferredSpamLang = "[en]";
static std::string receivedSpamMsgStr; static std::string m_receivedSpamMsgStr;
static std::string receivedSpamUserStr; static std::string m_receivedSpamUserStr;
// in-memory unencrypted DMs
struct StoredDirectMsg {
int64 m_utcTime;
std::string m_text;
bool m_fromMe;
};
// in-memory data per wallet user
struct UserData {
std::set<std::string> m_following;
// m_directmsg key is the other username
std::map<std::string, std::list<StoredDirectMsg> > m_directmsg;
};
static std::map<std::string,UserData> m_users;
sha1_hash dhtTargetHash(std::string const &username, std::string const &resource, std::string const &type) sha1_hash dhtTargetHash(std::string const &username, std::string const &resource, std::string const &type)
{ {
@ -59,6 +77,7 @@ sha1_hash dhtTargetHash(std::string const &username, std::string const &resource
torrent_handle startTorrentUser(std::string const &username) torrent_handle startTorrentUser(std::string const &username)
{ {
LOCK(cs_twister);
if( !m_userTorrent.count(username) ) { if( !m_userTorrent.count(username) ) {
sha1_hash ih = dhtTargetHash(username, "tracker", "m"); sha1_hash ih = dhtTargetHash(username, "tracker", "m");
@ -536,6 +555,47 @@ bool verifySignature(std::string const &strMessage, std::string const &strUserna
return (pubkeyRec.GetID() == pubkey.GetID()); return (pubkeyRec.GetID() == pubkey.GetID());
} }
bool processReceivedDM(lazy_entry const* post)
{
lazy_entry const* dm = post->dict_find_dict("dm");
if( dm ) {
ecies_secure_t sec;
sec.key = dm->dict_find_string_value("key");
sec.mac = dm->dict_find_string_value("mac");
sec.orig = dm->dict_find_int_value("orig");
sec.body = dm->dict_find_string_value("body");
LOCK(pwalletMain->cs_wallet);
BOOST_FOREACH(const PAIRTYPE(CKeyID, CKeyMetadata)& item, pwalletMain->mapKeyMetadata)
{
CKey key;
if (!pwalletMain->GetKey(item.first, key)) {
printf("acceptSignedPost: private key not available trying to decrypt DM.\n");
} else {
std::string textOut;
if( key.Decrypt(sec, textOut) ) {
printf("Received DM for user '%s' text = '%s'\n",
item.second.username.c_str(),
textOut.c_str());
std::string n = post->dict_find_string_value("n");
StoredDirectMsg stoDM;
stoDM.m_fromMe = false;
stoDM.m_text = textOut;
stoDM.m_utcTime = post->dict_find_int_value("time");;
LOCK(cs_twister);
m_users[item.second.username].m_directmsg[n].push_back(stoDM);
return true;
}
}
}
}
return false;
}
bool acceptSignedPost(char const *data, int data_size, std::string username, int seq, std::string &errmsg, boost::uint32_t *flags) bool acceptSignedPost(char const *data, int data_size, std::string username, int seq, std::string &errmsg, boost::uint32_t *flags)
{ {
bool ret = false; bool ret = false;
@ -600,27 +660,7 @@ bool acceptSignedPost(char const *data, int data_size, std::string username, int
lazy_entry const* dm = post->dict_find_dict("dm"); lazy_entry const* dm = post->dict_find_dict("dm");
if( dm && flags ) { if( dm && flags ) {
(*flags) |= USERPOST_FLAG_DM; (*flags) |= USERPOST_FLAG_DM;
processReceivedDM(post);
ecies_secure_t sec;
sec.key = dm->dict_find_string_value("key");
sec.mac = dm->dict_find_string_value("mac");
sec.orig = dm->dict_find_int_value("orig");
sec.body = dm->dict_find_string_value("body");
BOOST_FOREACH(const PAIRTYPE(CKeyID, CKeyMetadata)& item, pwalletMain->mapKeyMetadata)
{
CKey key;
if (!pwalletMain->GetKey(item.first, key)) {
printf("acceptSignedPost: private key not available trying to decrypt DM.\n");
} else {
std::string textOut;
if( key.Decrypt(sec, textOut) ) {
printf("Received DM for user '%s' text = '%s'\n",
item.second.username.c_str(),
textOut.c_str());
}
}
}
} }
} }
} }
@ -754,10 +794,11 @@ int getBestHeight()
void receivedSpamMessage(std::string const &message, std::string const &user) void receivedSpamMessage(std::string const &message, std::string const &user)
{ {
if( !receivedSpamMsgStr.length() || LOCK(cs_twister);
(preferredSpamLang.length() && message.find(preferredSpamLang) != string::npos) ) { if( !m_receivedSpamMsgStr.length() ||
receivedSpamMsgStr = message; (m_preferredSpamLang.length() && message.find(m_preferredSpamLang) != string::npos) ) {
receivedSpamUserStr = user; m_receivedSpamMsgStr = message;
m_receivedSpamUserStr = user;
} }
} }
@ -957,6 +998,15 @@ Value newdirectmsg(const Array& params, bool fHelp)
torrent_handle h = startTorrentUser(strFrom); torrent_handle h = startTorrentUser(strFrom);
h.add_piece(k,buf.data(),buf.size()); h.add_piece(k,buf.data(),buf.size());
StoredDirectMsg stoDM;
stoDM.m_fromMe = true;
stoDM.m_text = strMsg;
stoDM.m_utcTime = v["userpost"]["time"].integer();
{
LOCK(cs_twister);
m_users[strFrom].m_directmsg[strTo].push_back(stoDM);
}
return entryToJson(v); return entryToJson(v);
} }
@ -1067,26 +1117,29 @@ Value getposts(const Array& params, bool fHelp)
ret.push_back( entryToJson(rit->second) ); ret.push_back( entryToJson(rit->second) );
} }
if( receivedSpamMsgStr.length() ) { {
// we must agree on an acceptable level here LOCK(cs_twister);
if( rand() < (RAND_MAX/10) ) { if( m_receivedSpamMsgStr.length() ) {
entry v; // we must agree on an acceptable level here
entry &userpost = v["userpost"]; if( rand() < (RAND_MAX/10) ) {
entry v;
entry &userpost = v["userpost"];
userpost["n"] = receivedSpamUserStr; userpost["n"] = m_receivedSpamUserStr;
userpost["k"] = 1; userpost["k"] = 1;
userpost["time"] = GetAdjustedTime(); userpost["time"] = GetAdjustedTime();
userpost["height"] = getBestHeight(); userpost["height"] = getBestHeight();
userpost["msg"] = receivedSpamMsgStr; userpost["msg"] = m_receivedSpamMsgStr;
unsigned char vchSig[65]; unsigned char vchSig[65];
RAND_bytes(vchSig,sizeof(vchSig)); RAND_bytes(vchSig,sizeof(vchSig));
v["sig_userpost"] = std::string((const char *)vchSig, sizeof(vchSig)); v["sig_userpost"] = std::string((const char *)vchSig, sizeof(vchSig));
ret.insert(ret.begin(),entryToJson(v)); ret.insert(ret.begin(),entryToJson(v));
}
m_receivedSpamMsgStr = "";
m_receivedSpamUserStr = "";
} }
receivedSpamMsgStr = "";
receivedSpamUserStr = "";
} }
return ret; return ret;
@ -1102,6 +1155,7 @@ Value setspammsg(const Array& params, bool fHelp)
string strUsername = params[0].get_str(); string strUsername = params[0].get_str();
string strMsg = params[1].get_str(); string strMsg = params[1].get_str();
LOCK(cs_twister);
strSpamUser = strUsername; strSpamUser = strUsername;
strSpamMessage = strMsg; strSpamMessage = strMsg;
@ -1116,6 +1170,7 @@ Value getspammsg(const Array& params, bool fHelp)
"get spam message attached to generated blocks"); "get spam message attached to generated blocks");
Array ret; Array ret;
LOCK(cs_twister);
ret.push_back(strSpamUser); ret.push_back(strSpamUser);
ret.push_back(strSpamMessage); ret.push_back(strSpamMessage);
@ -1124,24 +1179,26 @@ Value getspammsg(const Array& params, bool fHelp)
Value follow(const Array& params, bool fHelp) Value follow(const Array& params, bool fHelp)
{ {
if (fHelp || (params.size() != 1)) if (fHelp || (params.size() != 2))
throw runtime_error( throw runtime_error(
"follow [username1,username2,...]\n" "follow <username> [follow_username1,follow_username2,...]\n"
"start following users"); "start following users");
Array users = params[0].get_array(); string localUser = params[0].get_str();
Array users = params[1].get_array();
LOCK(cs_twister);
for( unsigned int u = 0; u < users.size(); u++ ) { for( unsigned int u = 0; u < users.size(); u++ ) {
string username = users[u].get_str(); string username = users[u].get_str();
if( !m_following.count(username) ) { if( !m_users[localUser].m_following.count(username) ) {
if( m_userTorrent.count(username) ) { if( m_userTorrent.count(username) ) {
// perhaps torrent is already initialized due to neighborhood // perhaps torrent is already initialized due to neighborhood
m_following.insert(username); m_users[localUser].m_following.insert(username);
} else { } else {
torrent_handle h = startTorrentUser(username); torrent_handle h = startTorrentUser(username);
if( h.is_valid() ) { if( h.is_valid() ) {
m_following.insert(username); m_users[localUser].m_following.insert(username);
} }
} }
} }
@ -1152,18 +1209,20 @@ Value follow(const Array& params, bool fHelp)
Value unfollow(const Array& params, bool fHelp) Value unfollow(const Array& params, bool fHelp)
{ {
if (fHelp || (params.size() != 1)) if (fHelp || (params.size() != 2))
throw runtime_error( throw runtime_error(
"unfollow [username1,username2,...]\n" "unfollow <username> [unfollow_username1,unfollow_username2,...]\n"
"stop following users"); "stop following users");
Array users = params[0].get_array(); string localUser = params[0].get_str();
Array users = params[1].get_array();
LOCK(cs_twister);
for( unsigned int u = 0; u < users.size(); u++ ) { for( unsigned int u = 0; u < users.size(); u++ ) {
string username = users[u].get_str(); string username = users[u].get_str();
if( m_following.count(username) ) { if( m_users[localUser].m_following.count(username) ) {
m_following.erase(username); m_users[localUser].m_following.erase(username);
} }
} }
@ -1172,13 +1231,16 @@ Value unfollow(const Array& params, bool fHelp)
Value getfollowing(const Array& params, bool fHelp) Value getfollowing(const Array& params, bool fHelp)
{ {
if (fHelp || (params.size() != 0)) if (fHelp || (params.size() != 1))
throw runtime_error( throw runtime_error(
"getfollowing\n" "getfollowing <username>\n"
"get list of users we follow"); "get list of users we follow");
string localUser = params[0].get_str();
Array ret; Array ret;
BOOST_FOREACH(string username, m_following) { LOCK(cs_twister);
BOOST_FOREACH(string username, m_users[localUser].m_following) {
ret.push_back(username); ret.push_back(username);
} }
@ -1187,13 +1249,16 @@ Value getfollowing(const Array& params, bool fHelp)
Value getlasthave(const Array& params, bool fHelp) Value getlasthave(const Array& params, bool fHelp)
{ {
if (fHelp || (params.size() != 0)) if (fHelp || (params.size() != 1))
throw runtime_error( throw runtime_error(
"getfollowing\n" "getlasthave <username>\n"
"get last 'have' (higher post number) of each user user we follow"); "get last 'have' (higher post number) of each user user we follow");
string localUser = params[0].get_str();
Object ret; Object ret;
BOOST_FOREACH(string username, m_following) { LOCK(cs_twister);
BOOST_FOREACH(string username, m_users[localUser].m_following) {
ret.push_back(Pair(username,lastPostKfromTorrent(username))); ret.push_back(Pair(username,lastPostKfromTorrent(username)));
} }
@ -1213,13 +1278,18 @@ Value listusernamespartial(const Array& params, bool fHelp)
set<string> retStrings; set<string> retStrings;
// priorize users in following list // priorize users in following list
BOOST_FOREACH(const string &user, m_following)
{ {
int toCompare = std::min( userStartsWith.size(), user.size() ); LOCK(pwalletMain->cs_wallet);
if( memcmp( user.data(), userStartsWith.data(), toCompare ) == 0 ) BOOST_FOREACH(const PAIRTYPE(CKeyID, CKeyMetadata)& item, pwalletMain->mapKeyMetadata) {
retStrings.insert( user ); LOCK(cs_twister);
if( retStrings.size() >= count ) BOOST_FOREACH(const string &user, m_users[item.second.username].m_following) {
break; int toCompare = std::min( userStartsWith.size(), user.size() );
if( memcmp( user.data(), userStartsWith.data(), toCompare ) == 0 )
retStrings.insert( user );
if( retStrings.size() >= count )
break;
}
}
} }
// now the rest, the entire block chain // now the rest, the entire block chain

View File

@ -11,7 +11,7 @@ n = int(sys.argv[2])
datadir = "/tmp/twister%d" % n datadir = "/tmp/twister%d" % n
port = "%d" % (30000+n) port = "%d" % (30000+n)
rpcport = "%d" % (40000+n) rpcport = "%d" % (40000+n)
rpcline = " -rpcuser=user -rpcpassword=pwd -rpcallowip=127.0.0.1 -rpcport=" rpcline = " -genproclimit=1 -rpcuser=user -rpcpassword=pwd -rpcallowip=127.0.0.1 -rpcport="
rpccfg = rpcline + rpcport rpccfg = rpcline + rpcport
rpccfg1 = rpcline + "40001" rpccfg1 = rpcline + "40001"