diff --git a/Makefile.am b/Makefile.am index 3bdbbac8..e842d21c 100644 --- a/Makefile.am +++ b/Makefile.am @@ -189,6 +189,7 @@ twisterd_LDADD = $(LIBLEVELDB) $(LIBMEMENV) \ AM_CPPFLAGS = -ftemplate-depth-100 -DBOOST_SPIRIT_THREADSAFE -D_FILE_OFFSET_BITS=64 \ -I$(top_srcdir)/libtorrent/include \ + -I$(top_srcdir)/websocketpp \ -I$(top_srcdir)/src \ -I$(top_srcdir)/src/leveldb/include -I$(top_srcdir)/src/leveldb/helpers \ @DEBUGFLAGS@ @BOOST_CPPFLAGS@ @OPENSSL_INCLUDES@ @DB_CXX_CPPFLAGS@ diff --git a/bootstrap.sh b/bootstrap.sh index 3920cec8..6b2c9cda 100755 --- a/bootstrap.sh +++ b/bootstrap.sh @@ -1,5 +1,7 @@ #!/bin/sh +git submodule update --init + ./autotool.sh ./configure $@ diff --git a/src/bitcoinrpc.cpp b/src/bitcoinrpc.cpp index d371faef..31d3f971 100644 --- a/src/bitcoinrpc.cpp +++ b/src/bitcoinrpc.cpp @@ -35,6 +35,29 @@ #include #include +#include +#include +#include +#include +#include +#include + +typedef websocketpp::server wsserver; +typedef websocketpp::client wsclient; + +typedef websocketpp::server swsserver; +typedef websocketpp::client swsclient; + +using websocketpp::lib::placeholders::_1; +using websocketpp::lib::placeholders::_2; + +typedef websocketpp::config::asio_client::message_type::ptr message_ptr; +typedef websocketpp::lib::shared_ptr context_ptr; + +wsserver wss; +swsserver swss; +vector wsconnections; + using namespace std; using namespace boost; using namespace boost::asio; @@ -290,6 +313,7 @@ static const CRPCCommand vRPCCommands[] = { "uidtousername", &uidtousername, false, true, true }, { "newshorturl", &newshorturl, false, true, false }, { "decodeshorturl", &decodeshorturl, false, true, true }, +// { "openwebsocket", &openwebsocket, false, true, true }, }; CRPCTable::CRPCTable() @@ -759,8 +783,127 @@ static void RPCAcceptHandler(boost::shared_ptr< basic_socket_acceptor } } +template +void on_message_server(WST *s, websocketpp::connection_hdl hdl, message_ptr msg); +template +void on_connection(WST *s, websocketpp::connection_hdl hdl) +{ + wsconnections.push_back(hdl); + + s->send(hdl, "{\"result\":\"twister WEB Socket...\"}", websocketpp::frame::opcode::text); +} +template +void on_close(WST *s, websocketpp::connection_hdl hdl) +{ + typename WST::connection_ptr con = s->get_con_from_hdl(hdl); + vector::iterator it; + for (it = wsconnections.begin(); it != wsconnections.end(); ++it) + { + if (s->get_con_from_hdl(*it) == con) + { + wsconnections.erase(it); + break; + } + } +} + +context_ptr on_tls_init(websocketpp::connection_hdl hdl) +{ + namespace asio = websocketpp::lib::asio; + + context_ptr ctx = websocketpp::lib::make_shared(asio::ssl::context::sslv23); + + try + { + ctx->set_options(asio::ssl::context::default_workarounds | + asio::ssl::context::no_sslv2 | + asio::ssl::context::no_sslv3 | + asio::ssl::context::single_dh_use); + + ctx->use_certificate_chain_file(GetArg("-rpcsslcertificatechainfile", "server.cert")); + ctx->use_private_key_file(GetArg("-rpcsslprivatekeyfile", "server.pem"), asio::ssl::context::pem); + + std::string ciphers = GetArg("-rpcsslciphers", "TLSv1+HIGH:!SSLv2:!aNULL:!eNULL:!AH:!3DES:@STRENGTH"); + + if (SSL_CTX_set_cipher_list(ctx->native_handle() , ciphers.c_str()) != 1) + std::cout << "Error setting cipher list" << std::endl; + } + catch (std::exception& e) + { + std::cout << "Exception: " << e.what() << std::endl; + } + return ctx; +} + +void set_ws_tls_handler(swsserver &ws) +{ + ws.set_tls_init_handler(websocketpp::lib::bind(&on_tls_init, ::_1)); +} + +void set_ws_tls_handler(wsserver &ws){} + +void set_ws_tls_handler(swsclient &ws) +{ + ws.set_tls_init_handler(websocketpp::lib::bind(&on_tls_init, ::_1)); +} + +void set_ws_tls_handler(wsclient &ws){} + +template +void StartWSServer(WST &ws) +{ + //wsserver ws; + + try { + // Set logging settings + //ws.set_access_channels(websocketpp::log::alevel::all); + //ws.clear_access_channels(websocketpp::log::alevel::frame_payload); + ws.set_access_channels(websocketpp::log::alevel::none); + ws.clear_access_channels(websocketpp::log::alevel::all); + + + // Initialize Asio + ws.init_asio(); + + // Register our message handler + ws.set_message_handler(websocketpp::lib::bind(&on_message_server, &ws, ::_1, ::_2)); + ws.set_open_handler(websocketpp::lib::bind(&on_connection, &ws, ::_1)); + ws.set_close_handler(websocketpp::lib::bind(&on_close, &ws, ::_1)); + + set_ws_tls_handler(ws); + + // Listen + ws.listen(GetArg("-wsport", GetArg("-rpcport", Params().RPCPort()) + 1000)); + + // Start the server accept loop + ws.start_accept(); + + // Start the ASIO io_service run loop + ws.run(); + } + catch (websocketpp::exception const & e) + { + std::cout << e.what() << std::endl; + } + catch (...) + { + std::cout << "other exception" << std::endl; + } +} + void StartRPCThreads() { + const bool fUseSSL = GetBoolArg("-rpcssl", false); + if (GetBoolArg("-websocket", false)) + { + if (fUseSSL) + boost::thread wst(boost::bind(&StartWSServer, boost::ref(swss))); + else + boost::thread wst(boost::bind(&StartWSServer, boost::ref(wss))); + } + if (!GetBoolArg("-jsonrpc", true)) + return; + strRPCUserColonPass = mapArgs["-rpcuser"] + ":" + mapArgs["-rpcpassword"]; if (((mapArgs["-rpcpassword"] == "") || (mapArgs["-rpcuser"] == mapArgs["-rpcpassword"])) && Params().RequireRPCPassword()) @@ -796,8 +939,6 @@ void StartRPCThreads() rpc_io_service = new asio::io_service(); rpc_ssl_context = new ssl::context(ssl::context::sslv23); - const bool fUseSSL = GetBoolArg("-rpcssl", false); - if (fUseSSL) { rpc_ssl_context->set_options(ssl::context::no_sslv2); @@ -907,7 +1048,7 @@ void RPCRunLater(const std::string& name, boost::function func, int6 boost::shared_ptr(new deadline_timer(*rpc_io_service)))); } deadlineTimers[name]->expires_from_now(posix_time::seconds(nSeconds)); - deadlineTimers[name]->async_wait(boost::bind(RPCRunHandler, _1, func)); + deadlineTimers[name]->async_wait(boost::bind(RPCRunHandler, boost::placeholders::_1, func)); } @@ -1025,7 +1166,7 @@ void ServiceConnection(AcceptedConnection *conn) if (mapHeaders["connection"] == "close") fRun = false; - + if(strMethod == "GET" && strURI == "/") strURI="/home.html"; @@ -1145,7 +1286,7 @@ json_spirit::Value CRPCTable::execute(const std::string &strMethod, const json_s const CRPCCommand *pcmd = tableRPC[strMethod]; if (!pcmd) throw JSONRPCError(RPC_METHOD_NOT_FOUND, "Method not found"); - + if(!pcmd->allowOnPublicServer && GetBoolArg("-public_server_mode",false)) throw JSONRPCError(RPC_FORBIDDEN_ON_PUBLIC_SERVER, "Forbidden: accessing this method is not allowed on a public server"); @@ -1175,9 +1316,152 @@ json_spirit::Value CRPCTable::execute(const std::string &strMethod, const json_s } } +template +void on_message_client(WST* c, websocketpp::connection_hdl hdl, message_ptr msg) +{ + string strPrint; + int nRet = 0; + + try + { + // Parse reply + Value valReply; + if (!read_string(msg->get_payload(), valReply)) + throw runtime_error("couldn't parse reply from server"); + const Object& reply = valReply.get_obj(); + if (reply.empty()) + throw runtime_error("expected reply to have result, error and id properties"); + + // Parse reply + const Value& result = find_value(reply, "result"); + const Value& error = find_value(reply, "error"); + + if (error.type() != null_type) + { + // Error + strPrint = "error: " + write_string(error, false); + int code = find_value(error.get_obj(), "code").get_int(); + nRet = abs(code); + } + else + { + // Result + if (result.type() == null_type) + strPrint = ""; + else if (result.type() == str_type) + strPrint = result.get_str(); + else + strPrint = write_string(result, true); + } + } + catch (boost::thread_interrupted) + { + throw; + } + catch (std::exception& e) { + strPrint = string("error: ") + e.what(); + nRet = 87; + } + catch (...) + { + PrintException(NULL, "CommandLineRPC()"); + } + + if (strPrint != "") + { + fprintf((nRet == 0 ? stdout : stderr), "%s\n", strPrint.c_str()); + } +} + +template +Object StartWSClient(bool fUseSSL) +{ + WST c; + + stringstream ssuri; + ssuri << "ws" << (fUseSSL ? "s" : "") <<"://" << GetArg("-rpcconnect", "127.0.0.1") << ":" << GetArg("-wsport", GetArg("-rpcport", Params().RPCPort()) + 1000); + + try + { + // Set logging to be pretty verbose (everything except message payloads) + //c.set_access_channels(websocketpp::log::alevel::all); + //c.clear_access_channels(websocketpp::log::alevel::frame_payload); + c.set_access_channels(websocketpp::log::alevel::none); + c.clear_access_channels(websocketpp::log::alevel::all); + + // Initialize ASIO + c.init_asio(); + //c.start_perpetual(); + boost::thread wsth(boost::bind(&WST::run, &c)); + + // Register our message handler + c.set_message_handler(websocketpp::lib::bind(&on_message_client, &c, ::_1, ::_2)); + set_ws_tls_handler(c); + + websocketpp::lib::error_code ec; + typename WST::connection_ptr con = c.get_connection(ssuri.str(), ec); + if (ec) { + std::cout << "could not create connection because: " << ec.message() << std::endl; + Object reply; + reply.push_back(Pair("error", ec.message())); + return reply; + } + + // Note that connect here only requests a connection. No network messages are + // exchanged until the event loop starts running in the next line. + c.connect(con); + + // Start the ASIO io_service run loop + // this will cause a single connection to be made to the server. c.run() + // will exit when this connection is closed. + //c.run(); + + string strCommand; + while (strCommand != "closewebsocket" && strCommand != "stop") + { + getline(cin, strCommand); + + vector tokens; + boost::algorithm::split(tokens, strCommand, boost::algorithm::is_any_of(" ")); + + // Method + if (tokens.size() < 1) + continue; + vector::iterator it = tokens.begin(); + string method = *it; + + // Parameters default to strings + vector strParams(++it, tokens.end()); + Array params = RPCConvertValues(method, strParams); + + string strRequest = JSONRPCRequest(method, params, 1); + c.send(con->get_handle(), strRequest, websocketpp::frame::opcode::text); + } + } + catch (websocketpp::exception const & e) + { + std::cout << e.what() << std::endl; + + Object reply; + reply.push_back(Pair("error", e.what())); + return reply; + } + Object reply; + reply.push_back(Pair("result", "OK")); + return reply; +} Object CallRPC(const string& strMethod, const Array& params) { + bool fUseSSL = GetBoolArg("-rpcssl", false); + if (strMethod == "openwebsocket") + { + if (fUseSSL) + return StartWSClient(fUseSSL); + else + return StartWSClient(fUseSSL); + } + if (mapArgs["-rpcuser"] == "" && mapArgs["-rpcpassword"] == "") throw runtime_error(strprintf( _("You must set rpcpassword= in the configuration file:\n%s\n" @@ -1185,7 +1469,6 @@ Object CallRPC(const string& strMethod, const Array& params) GetConfigFile().string().c_str())); // Connect to localhost - bool fUseSSL = GetBoolArg("-rpcssl", false); asio::io_service io_service; ssl::context context(ssl::context::sslv23); context.set_options(ssl::context::no_sslv2); @@ -1445,7 +1728,95 @@ int CommandLineRPC(int argc, char *argv[]) return nRet; } +template +void on_message_server(WST* s, websocketpp::connection_hdl hdl, message_ptr msg) +{ + /* + std::cout << "on_message called with hdl: " << hdl.lock().get() + << " and message: " << msg->get_payload() + << std::endl; + */ + + // check for a special command to instruct the server to stop listening so + // it can be cleanly exited. + if (msg->get_payload() == "closewebsocket") + { + s->stop_listening(); + return; + } + + JSONRequest jreq; + string strReply; + try + { + // Parse request + Value valRequest; + if (!read_string(msg->get_payload(), valRequest)) + throw JSONRPCError(RPC_PARSE_ERROR, "Parse error"); + + // singleton request + if (valRequest.type() == obj_type) + { + jreq.parse(valRequest); + + Value result = tableRPC.execute(jreq.strMethod, jreq.params); + + // Send reply + strReply = JSONRPCReply(result, Value::null, jreq.id); + + if (jreq.strMethod == "stop") + s->stop_listening(); + } + else if (valRequest.type() == array_type) + strReply = JSONRPCExecBatch(valRequest.get_array()); + else + throw JSONRPCError(RPC_PARSE_ERROR, "Top-level object parse error"); + } + catch (Object& objError) + { + strReply = JSONRPCReply(Value::null, objError, jreq.id); + } + catch (std::exception& e) + { + strReply = e.what(); + } + + try + { + s->send(hdl, strReply, msg->get_opcode()); + } + catch (const websocketpp::lib::error_code& e) + { + std::cout << "Echo failed because: " << e + << "(" << e.message() << ")" << std::endl; + } +} + +void WriteToWS(Value const& val) +{ + string strPrint; + bool fUseSSL = GetBoolArg("-rpcssl", false); + + if (val.type() == null_type) + return; + else if (val.type() == str_type) + strPrint = val.get_str(); + else + strPrint = write_string(val, false); + for (vector::iterator it = wsconnections.begin(); it != wsconnections.end(); ++it) + { + websocketpp::lib::error_code ec; + if (fUseSSL) + swss.send(*it, strPrint, websocketpp::frame::opcode::text, ec); + else + wss.send(*it, strPrint, websocketpp::frame::opcode::text, ec); + if (ec) + { + cout << "ERROR : " << ec.message() << endl << ec << endl; + } + } +} #ifdef TEST diff --git a/src/bitcoinrpc.h b/src/bitcoinrpc.h index c1d1f75a..20fecc3e 100644 --- a/src/bitcoinrpc.h +++ b/src/bitcoinrpc.h @@ -74,6 +74,8 @@ void StartRPCThreads(); void StopRPCThreads(); int CommandLineRPC(int argc, char *argv[]); +void WriteToWS(json_spirit::Value const& val); + /** Convert parameter values for RPC call from strings to command-specific JSON objects. */ json_spirit::Array RPCConvertValues(const std::string &strMethod, const std::vector &strParams); diff --git a/src/init.cpp b/src/init.cpp index d1653cb6..ac03a4dc 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -231,6 +231,7 @@ std::string HelpMessage() #ifdef WIN32 strUsage += " -printtodebugger " + _("Send trace/debug info to debugger") + "\n"; #endif + strUsage += " -jsonrpc " + _("Enable JSON-RPC service (default: 1)") + "\n"; strUsage += " -rpcuser= " + _("Username for JSON-RPC connections") + "\n"; strUsage += " -rpcpassword= " + _("Password for JSON-RPC connections") + "\n"; strUsage += " -rpcport= " + _("Listen for JSON-RPC connections on (default: 28332 or testnet: 18332)") + "\n"; @@ -238,6 +239,9 @@ std::string HelpMessage() if (!fHaveGUI) strUsage += " -rpcconnect= " + _("Send commands to node running on (default: 127.0.0.1)") + "\n"; strUsage += " -rpcthreads= " + _("Set the number of threads to service RPC calls (default: 10)") + "\n"; + strUsage += " -websocket " + _("Enables WEB Socket connections (default: 0)") + "\n"; + strUsage += " -wsport= " + _("Listen for WEB Socket on (default: rpcport+1000)") + "\n"; + strUsage += " -public_server_mode " + _("Limit JSON-RPC execution to public/safe commands only.") + "\n"; strUsage += " -blocknotify= " + _("Execute command when the best block changes (%s in cmd is replaced by block hash)") + "\n"; strUsage += " -walletnotify= " + _("Execute command when a wallet transaction changes (%s in cmd is replaced by TxID)") + "\n"; @@ -259,7 +263,7 @@ std::string HelpMessage() strUsage += " -blockprioritysize= " + _("Set maximum size of high-priority/low-fee transactions in bytes (default: 27000)") + "\n"; strUsage += "\n"; _("SSL options: (see the Bitcoin Wiki for SSL setup instructions)") + "\n"; - strUsage += " -rpcssl " + _("Use OpenSSL (https) for JSON-RPC connections") + "\n"; + strUsage += " -rpcssl " + _("Use OpenSSL (https) for JSON-RPC and/or WEB Socket connections") + "\n"; strUsage += " -rpcsslcertificatechainfile= " + _("Server certificate file (default: server.cert)") + "\n"; strUsage += " -rpcsslprivatekeyfile= " + _("Server private key (default: server.pem)") + "\n"; strUsage += " -rpcsslciphers= " + _("Acceptable ciphers (default: TLSv1+HIGH:!SSLv2:!aNULL:!eNULL:!AH:!3DES:@STRENGTH)") + "\n"; @@ -427,11 +431,11 @@ bool AppInit2(boost::thread_group& threadGroup) // to protect privacy, do not listen by default if a proxy server is specified SoftSetBoolArg("-listen", false); } - + if (mapArgs.count("-proxy") || mapArgs.count("-tor")) { SoftSetBoolArg("-dhtproxy", true); } - + if (!GetBoolArg("-listen", true)) { // do not map ports or try to retrieve public IP when not listening (pointless) SoftSetBoolArg("-upnp", false); diff --git a/src/twister.cpp b/src/twister.cpp index 26922c11..6a39e597 100644 --- a/src/twister.cpp +++ b/src/twister.cpp @@ -149,7 +149,7 @@ torrent_handle startTorrentUser(std::string const &username, bool following, int LOCK(cs_twister); if( m_peekTorrent.count(username) ) { - /* multiple paralel peek piece operations per torrent not + /* multiple paralel peek piece operations per torrent not * currently supported. return invalid handle for subsequent * requests, until current operation completes and torrent * is freed. */ @@ -331,7 +331,7 @@ void ThreadWaitExtIP() proxyType proxyInfoOut; m_usingProxy = GetProxy(NET_IPV4, proxyInfoOut); - printf("Creating new libtorrent session ext_ip=%s port=%d proxy=%s\n", + printf("Creating new libtorrent session ext_ip=%s port=%d proxy=%s\n", ipStr.c_str(), !m_usingProxy ? listen_port : 0, m_usingProxy ? proxyInfoOut.first.ToStringIPPort().c_str() : ""); @@ -368,7 +368,7 @@ void ThreadWaitExtIP() ses->start_upnp(); ses->start_natpmp(); } - + ses->listen_on(std::make_pair(listen_port, listen_port) , ec, bind_to_interface.c_str()); if (ec) @@ -377,20 +377,20 @@ void ThreadWaitExtIP() , bind_to_interface.empty() ? "" : " on ", bind_to_interface.c_str() , listen_port, listen_port+1, ec.message().c_str()); } - + dht_settings dhts; // settings to test local connections //dhts.restrict_routing_ips = false; //dhts.restrict_search_ips = false; ses->set_dht_settings(dhts); - + if( !DhtProxy::fEnabled ) { ses->start_dht(); } else { ses->stop_dht(); } } - + session_settings settings("twisterd/"+FormatFullVersion()); // settings to test local connections settings.allow_multiple_connections_per_ip = GetBoolArg("-multiconnperip", false); @@ -435,7 +435,7 @@ void ThreadWaitExtIP() if( ss.dht_nodes ) break; } - + if( generateOpt ) { Array params; params.push_back( generateOpt ); @@ -462,7 +462,7 @@ void ThreadWaitExtIP() torrentsToStart.insert(username); } } - + // add torrents from groups std::map::const_iterator j; for (j = m_groups.begin(); j != m_groups.end(); ++j) { @@ -539,10 +539,10 @@ void lockAndSaveUserData() int getDhtNodes(boost::int64_t *dht_global_nodes) { int dhtNodes = 0; - + if( dht_global_nodes ) *dht_global_nodes = 0; - + if( !DhtProxy::fEnabled ) { boost::shared_ptr ses(m_ses); if( ses ) { @@ -560,16 +560,16 @@ int getDhtNodes(boost::int64_t *dht_global_nodes) void torrentManualTrackerUpdate(const std::string &username) { - printf("torrentManualTrackerUpdate: updating torrent '%s'\n", + printf("torrentManualTrackerUpdate: updating torrent '%s'\n", username.c_str()); - + Array params; params.push_back(username); params.push_back("tracker"); params.push_back("m"); Array res = dhtget(params, false).get_array(); if( !res.size() ) { - printf("torrentManualTrackerUpdate: no tracker response for torrent '%s'\n", + printf("torrentManualTrackerUpdate: no tracker response for torrent '%s'\n", username.c_str()); } else { torrent_handle h = getTorrentUser(username); @@ -587,7 +587,7 @@ void torrentManualTrackerUpdate(const std::string &username) BOOST_FOREACH(const Pair& vitem, vDict) { if( vitem.name_ == "values" && vitem.value_.type() == array_type ) { Array values = vitem.value_.get_array(); - printf("torrentManualTrackerUpdate: tracker for '%s' returned %zd values\n", + printf("torrentManualTrackerUpdate: tracker for '%s' returned %zd values\n", username.c_str(), values.size()); for( size_t j = 0; j < values.size(); j++ ) { if( values.at(j).type() != str_type ) @@ -632,7 +632,7 @@ void ThreadMaintainDHTNodes() while(m_ses && !m_shuttingDownSession) { boost::shared_ptr ses(m_ses); - + session_status ss = ses->status(); int dht_nodes = ss.dht_nodes; bool nodesAdded = false; @@ -723,7 +723,7 @@ void ThreadMaintainDHTNodes() } } } - + // if dhtproxy is enabled we may need to manually obtain peer lists from trackers if( DhtProxy::fEnabled && !ses->is_paused() && GetTime() > lastManualTrackerUpdate + 60 ) { @@ -734,7 +734,7 @@ void ThreadMaintainDHTNodes() activeTorrents.push_back(item.first); } } - + BOOST_FOREACH(const std::string &username, activeTorrents) { if( m_shuttingDownSession ) break; @@ -768,7 +768,7 @@ void ThreadSessionAlerts() static map statusCheck; SimpleThreadCounter threadCounter(&cs_twister, &m_threadsToJoin, "session-alerts"); - + while(!m_ses && !m_shuttingDownSession) { MilliSleep(200); } @@ -834,7 +834,7 @@ void ThreadSessionAlerts() } else { // now we do our own search to make sure we are really close to this target sha1_hash ih = dhtTargetHash(n->string(), r->string(), t->string()); - + bool knownTorrent = false; { LOCK(cs_twister); @@ -936,10 +936,10 @@ void ThreadSessionAlerts() { boost::system::error_code ec; std::string extip = ei->external_address.to_string(ec); - + printf("Learned new external IP from DHT peers: %s\n", extip.c_str()); CNetAddr addrLocalHost(extip); - + // pretend it came from querying http server. try voting up to 10 times // to change current external ip in bitcoin code. for(int i=0; i < 10; i++) { @@ -1002,7 +1002,7 @@ void startSessionTorrent(boost::thread_group& threadGroup) m_noExpireResources["following"] = NumberedNoExpire; m_noExpireResources["status"] = SimpleNoExpire; m_noExpireResources["post"] = PostNoExpireRecent; - + DhtProxy::fEnabled = GetBoolArg("-dhtproxy", false); m_threadsToJoin = 0; @@ -1053,7 +1053,7 @@ void stopSessionTorrent() save_file(sesStatePath.string(), out); m_ses->stop_dht(); - + m_ses.reset(); } @@ -1253,7 +1253,7 @@ void registerNewGroup(const string &privKey, const string &desc, const string &m storeGroupDM(groupAlias,stoDM); } else { group.m_members.insert(member); - + if( m_users.count(member) && !m_users.at(member).m_ignoreGroups.count(groupAlias) ) { StoredDirectMsg stoDM; stoDM.m_fromMe = false; @@ -1440,6 +1440,23 @@ bool processReceivedDM(lazy_entry const* post) } else { storeNewDM(item.second.username, fromMe ? to : from, stoDM); } + + if (GetBoolArg("-websocket", false) && !fromMe) + { + Object dm; + + dm.push_back(Pair("type", isGroup ? "GROUP" : "DM")); + if (isGroup) + dm.push_back(Pair("group", item.second.username)); + dm.push_back(Pair("k", k)); + dm.push_back(Pair("time", utcTime)); + dm.push_back(Pair("from", from)); + dm.push_back(Pair("to", to)); + dm.push_back(Pair("msg", msg)); + + WriteToWS(dm); + } + break; } } @@ -1474,17 +1491,55 @@ void processReceivedPost(lazy_entry const &v, std::string &username, int64 time, LOCK(cs_twister); // mention of a local user && sent by someone we follow - if( m_users.count(mentionUser) && m_users[mentionUser].m_following.count(username) ) { + if( m_users.count(mentionUser) && m_users[mentionUser].m_following.count(username) ) + { std::string postKey = username + ";" + boost::lexical_cast(time); - if( m_users[mentionUser].m_mentionsKeys.count(postKey) == 0 ) { + if( m_users[mentionUser].m_mentionsKeys.count(postKey) == 0 ) + { m_users[mentionUser].m_mentionsKeys.insert(postKey); entry vEntry; vEntry = v; m_users[mentionUser].m_mentionsPosts.push_back(vEntry); + + if (GetBoolArg("-websocket", false)) + { + Object obj; + + obj.push_back(Pair("type", "mention")); + obj.push_back(Pair("from", username)); + obj.push_back(Pair("to", mentionUser)); + hexcapePost(vEntry); + obj.push_back(Pair("post", entryToJson(vEntry))); + + WriteToWS(obj); + } } } } } + + if (GetBoolArg("-websocket", false)) + { + entry vEntry; + vEntry = v; + for (map::const_iterator it = m_users.begin(); + it != m_users.end(); + ++it) + { + if (it->second.m_following.count(username)) + { + Object obj; + + obj.push_back(Pair("type", "post")); + obj.push_back(Pair("postboard", it->first)); + obj.push_back(Pair("from", username)); + hexcapePost(vEntry); + obj.push_back(Pair("post", entryToJson(vEntry))); + + WriteToWS(obj); + } + } + } } bool acceptSignedPost(char const *data, int data_size, std::string username, int seq, std::string &errmsg, boost::uint32_t *flags) @@ -1742,7 +1797,7 @@ bool createSignedUserpost(entry &v, std::string const &username, int k, bool createDirectMessage(entry &dm, std::string const &to, std::string const &msg) { CPubKey pubkey; - + /* try obtaining key from wallet first */ CKeyID keyID; if (pwalletMain->GetKeyIdFromUsername(to, keyID) && @@ -1894,7 +1949,7 @@ void updateSeenHashtags(std::string &message, int64_t msgTime) } } } - + if( hashtags.size() ) { boost::int64_t curTime = GetAdjustedTime(); if( msgTime > curTime ) msgTime = curTime; @@ -1917,13 +1972,13 @@ entry formatSpamPost(const string &msg, const string &username, uint64_t utcTime { entry v; entry &userpost = v["userpost"]; - + userpost["n"] = username; userpost["k"] = height ? height : 1; userpost["time"] = utcTime ? utcTime : GetAdjustedTime(); userpost["height"] = height ? height : getBestHeight(); userpost["msg"] = msg; - + v["sig_userpost"] = ""; return v; } @@ -2120,7 +2175,7 @@ Value dhtget(const Array& params, bool fHelp) alert_manager am(10, alert::dht_notification); sha1_hash ih = dhtTargetHash(strUsername,strResource,strMulti); - + vector dhtProxyNodes; if( !DhtProxy::fEnabled ) { dhtgetMapAdd(ih, &am); @@ -2637,7 +2692,7 @@ Value getposts(const Array& params, bool fHelp) v.type() == lazy_entry::dict_t) { lazy_entry const* post = v.dict_find_dict("userpost"); int64 time = post->dict_find_int_value("time",-1); - + if(time == -1 || time > GetAdjustedTime() + MAX_TIME_IN_FUTURE ) { printf("getposts: ignoring far-future message by '%s'\n", strUsername.c_str()); } @@ -2749,7 +2804,7 @@ Value getmentions(const Array& params, bool fHelp) if( i->name_ == "since_id" ) since_id = i->value_.get_int(); } } - + Array ret; LOCK(cs_twister); @@ -3243,12 +3298,12 @@ Value getspamposts(const Array& params, bool fHelp) Array ret; std::string lastMsg; - + for( int height = max_id; height > since_id && (int)ret.size() < count; height-- ) { CBlockIndex* pblockindex = FindBlockByHeight(height); CBlock block; ReadBlockFromDisk(block, pblockindex); - + const CTransaction &tx = block.vtx[0]; if( tx.IsSpamMessage() ) { std::string spamMessage = tx.message.ExtractPushDataString(0); @@ -3281,9 +3336,9 @@ Value torrentstatus(const Array& params, bool fHelp) if( !h.is_valid() ){ return Value(); } - + torrent_status status = h.status(); - + Object result; result.push_back(Pair("state", status.state)); result.push_back(Pair("paused", status.paused)); @@ -3302,13 +3357,13 @@ Value torrentstatus(const Array& params, bool fHelp) result.push_back(Pair("has_incoming", status.has_incoming)); result.push_back(Pair("priority", status.priority)); result.push_back(Pair("queue_position", status.queue_position)); - + Array peers; std::vector peerInfos; h.get_peer_info(peerInfos); BOOST_FOREACH(const peer_info &p, peerInfos) { Object info; - info.push_back(Pair("addr", p.ip.address().to_string() + ":" + + info.push_back(Pair("addr", p.ip.address().to_string() + ":" + boost::lexical_cast(p.ip.port()))); char flags[10]; sprintf(flags,"0x%x",p.flags); @@ -3324,7 +3379,7 @@ Value torrentstatus(const Array& params, bool fHelp) peers.push_back(info); } result.push_back(Pair("peers", peers)); - + return result; } @@ -3462,7 +3517,7 @@ lazy_entry const* TextSearch::matchRawMessage(string const &rawMessage, lazy_ent int pos; libtorrent::error_code ec; - if (lazy_bdecode(rawMessage.data(), rawMessage.data()+rawMessage.size(), v, ec, &pos) == 0 && + if (lazy_bdecode(rawMessage.data(), rawMessage.data()+rawMessage.size(), v, ec, &pos) == 0 && v.type() == lazy_entry::dict_t) { lazy_entry const* vv = v.dict_find_dict("v"); lazy_entry const* post = vv ? vv->dict_find_dict("userpost") : v.dict_find_dict("userpost"); @@ -3833,7 +3888,7 @@ Object getLibtorrentSessionStatus() boost::shared_ptr ses(m_ses); if( ses ) { session_status stats = ses->status(); - + obj.push_back( Pair("ext_addr_net2", stats.external_addr_v4) ); obj.push_back( Pair("dht_torrents", stats.dht_torrents) ); @@ -4189,7 +4244,7 @@ Value peekpost(const Array& params, bool fHelp) lazy_entry v; int pos; libtorrent::error_code ec; - if(pieces.size() && + if(pieces.size() && lazy_bdecode(pieces[0].data(), pieces[0].data()+pieces[0].size(), v, ec, &pos) == 0 && v.type() == lazy_entry::dict_t) { printf("peekpiece: got piece (%s,%d) from local torrent\n",strUsername.c_str(), k); @@ -4366,12 +4421,12 @@ Value newshorturl(const Array& params, bool fHelp) vch.resize(8); le32enc(&vch[0], res.get_int()); le32enc(&vch[4], k); - + string uid_k_64 = EncodeBase64(&vch[0], vch.size()); - + Array uriOptions; uriOptions.push_back(string("twist:")+uid_k_64); - + return uriOptions; } @@ -4396,7 +4451,7 @@ Value decodeshorturl(const Array& params, bool fHelp) if (uid_k_64.length() < 12) { throw JSONRPCError(RPC_PARSE_ERROR, "base64 string too small"); } - + string vch = DecodeBase64(uid_k_64); int uid = le32dec(&vch[0]); int k = le32dec(&vch[4]); @@ -4407,9 +4462,9 @@ Value decodeshorturl(const Array& params, bool fHelp) paramsSub.clear(); paramsSub.push_back(uid); res = uidtousername(paramsSub, false); - + string strUsername = res.get_str(); - + paramsSub.clear(); paramsSub.push_back(strUsername); paramsSub.push_back(k);