Browse Source

Merge pull request #423 from erqan/original

WebSocket...
miguelfreitas
miguelfreitas 7 years ago committed by GitHub
parent
commit
4d963ad587
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      .gitmodules
  2. 1
      Makefile.am
  3. 24
      configure.ac
  4. 387
      src/bitcoinrpc.cpp
  5. 4
      src/bitcoinrpc.h
  6. 8
      src/init.cpp
  7. 61
      src/twister.cpp
  8. 1
      websocketpp

3
.gitmodules vendored

@ -0,0 +1,3 @@ @@ -0,0 +1,3 @@
[submodule "websocketpp"]
path = websocketpp
url = https://github.com/zaphoyd/websocketpp.git

1
Makefile.am

@ -189,6 +189,7 @@ twisterd_LDADD = $(LIBLEVELDB) $(LIBMEMENV) \ @@ -189,6 +189,7 @@ twisterd_LDADD = $(LIBLEVELDB) $(LIBMEMENV) \
AM_CPPFLAGS = -ftemplate-depth-100 -DBOOST_SPIRIT_THREADSAFE -D_FILE_OFFSET_BITS=64 \
-I$(top_srcdir)/libtorrent/include \
-I$(top_srcdir)/websocketpp \
-I$(top_srcdir)/src \
-I$(top_srcdir)/src/leveldb/include -I$(top_srcdir)/src/leveldb/helpers \
@DEBUGFLAGS@ @BOOST_CPPFLAGS@ @OPENSSL_INCLUDES@ @DB_CXX_CPPFLAGS@

24
configure.ac

@ -420,6 +420,14 @@ AC_ARG_WITH( @@ -420,6 +420,14 @@ AC_ARG_WITH(
[[ARG_WITH_LIBICONV=no]]
)
AC_ARG_ENABLE(
[websocket],
[AS_HELP_STRING(
[--enable-websocket],
[enable websocket [default=no]])],
[[ARG_ENABLE_WS=$enableval]],
[[ARG_ENABLE_WS=no]]
)
###############################################################################
# Checking configure options
###############################################################################
@ -631,6 +639,20 @@ AS_CASE(["$ARG_ENABLE_RSS"], @@ -631,6 +639,20 @@ AS_CASE(["$ARG_ENABLE_RSS"],
AC_MSG_ERROR([Unknown option "$ARG_ENABLE_RSS". Use either "yes" or "no".])]
)
AC_MSG_CHECKING([whether websocket should be enabled])
AS_CASE(["$ARG_ENABLE_WS"],
["yes"|"on"], [
AC_MSG_RESULT([yes])
AC_DEFINE([ENABLE_WS],[1],[Enable WebSocket])
CXXFLAGS="$CXXFLAGS -DENABLE_WS "
],
["no"|"off"], [
AC_MSG_RESULT([no])
],
[AC_MSG_RESULT([$ARG_ENABLE_WS])
AC_MSG_ERROR([Unknown option "$ARG_ENABLE_WS". Use either "yes" or "no".])]
)
AS_ECHO
AS_ECHO "Checking for extra build files:"
@ -749,6 +771,7 @@ AM_CONDITIONAL([WITH_SHIPPED_GEOIP], [test "x$ARG_WITH_LIBGEOIP" = "xno" ]) @@ -749,6 +771,7 @@ AM_CONDITIONAL([WITH_SHIPPED_GEOIP], [test "x$ARG_WITH_LIBGEOIP" = "xno" ])
AM_CONDITIONAL([WITH_OPENSSL], [test "x$ARG_ENABLE_ENCRYPTION" = "xyes" -o "x$ARG_ENABLE_ENCRYPTION" = "xon" ])
AM_CONDITIONAL([USE_SSE2], [test "x$ARG_ENABLE_SSE2" = "xyes" -o "x$ARG_ENABLE_SSE2" = "xon" ])
AM_CONDITIONAL([ENABLE_RSS], [test "x$ARG_ENABLE_RSS" = "xyes" -o "x$ARG_ENABLE_RSS" = "xon" ])
AM_CONDITIONAL([ENABLE_WS], [test "x$ARG_ENABLE_WS" = "xyes" -o "x$ARG_ENABLE_WS" = "xon" ])
###############################################################################
# Other useful stuff
@ -852,6 +875,7 @@ Features: @@ -852,6 +875,7 @@ Features:
dht support: ${ARG_ENABLE_DHT:-yes}
pool allocators: ${ARG_ENABLE_POOL_ALLOC:-yes}
rss feed: ${ARG_ENABLE_RSS:-yes}
websocket: ${ARG_ENABLE_WS:-yes}
Extra builds:
examples: ${ARG_ENABLE_EXAMPLES:-no}

387
src/bitcoinrpc.cpp

@ -35,6 +35,31 @@ @@ -35,6 +35,31 @@
#include <fstream>
#include <streambuf>
#ifdef ENABLE_WS
#include <websocketpp/config/asio_no_tls_client.hpp>
#include <websocketpp/client.hpp>
#include <websocketpp/config/asio_no_tls.hpp>
#include <websocketpp/server.hpp>
#include <websocketpp/config/asio.hpp>
#include <websocketpp/config/asio_client.hpp>
typedef websocketpp::server<websocketpp::config::asio> wsserver;
typedef websocketpp::client<websocketpp::config::asio_client> wsclient;
typedef websocketpp::server<websocketpp::config::asio_tls> swsserver;
typedef websocketpp::client<websocketpp::config::asio_tls_client> swsclient;
using websocketpp::lib::placeholders::_1;
using websocketpp::lib::placeholders::_2;
typedef websocketpp::config::asio_client::message_type::ptr message_ptr;
typedef websocketpp::lib::shared_ptr<websocketpp::lib::asio::ssl::context> context_ptr;
wsserver wss;
swsserver swss;
vector<websocketpp::connection_hdl> wsconnections;
#endif // ENABLE_WS
using namespace std;
using namespace boost;
using namespace boost::asio;
@ -759,8 +784,129 @@ static void RPCAcceptHandler(boost::shared_ptr< basic_socket_acceptor<Protocol> @@ -759,8 +784,129 @@ static void RPCAcceptHandler(boost::shared_ptr< basic_socket_acceptor<Protocol>
}
}
#ifdef ENABLE_WS
template <class WST>
void on_message_server(WST *s, websocketpp::connection_hdl hdl, message_ptr msg);
template <class WST>
void on_connection(WST *s, websocketpp::connection_hdl hdl)
{
wsconnections.push_back(hdl);
s->send(hdl, "{\"result\":\"twister WEB Socket...\"}", websocketpp::frame::opcode::text);
}
template <class WST>
void on_close(WST *s, websocketpp::connection_hdl hdl)
{
typename WST::connection_ptr con = s->get_con_from_hdl(hdl);
vector<websocketpp::connection_hdl>::iterator it;
for (it = wsconnections.begin(); it != wsconnections.end(); ++it)
{
if (s->get_con_from_hdl(*it) == con)
{
wsconnections.erase(it);
break;
}
}
}
context_ptr on_tls_init(websocketpp::connection_hdl hdl)
{
namespace asio = websocketpp::lib::asio;
context_ptr ctx = websocketpp::lib::make_shared<asio::ssl::context>(asio::ssl::context::sslv23);
try
{
ctx->set_options(asio::ssl::context::default_workarounds |
asio::ssl::context::no_sslv2 |
asio::ssl::context::no_sslv3 |
asio::ssl::context::single_dh_use);
ctx->use_certificate_chain_file(GetArg("-rpcsslcertificatechainfile", "server.cert"));
ctx->use_private_key_file(GetArg("-rpcsslprivatekeyfile", "server.pem"), asio::ssl::context::pem);
std::string ciphers = GetArg("-rpcsslciphers", "TLSv1+HIGH:!SSLv2:!aNULL:!eNULL:!AH:!3DES:@STRENGTH");
if (SSL_CTX_set_cipher_list(ctx->native_handle() , ciphers.c_str()) != 1)
std::cout << "Error setting cipher list" << std::endl;
}
catch (std::exception& e)
{
std::cout << "Exception: " << e.what() << std::endl;
}
return ctx;
}
void set_ws_tls_handler(swsserver &ws)
{
ws.set_tls_init_handler(websocketpp::lib::bind(&on_tls_init, ::_1));
}
void set_ws_tls_handler(wsserver &ws){}
void set_ws_tls_handler(swsclient &ws)
{
ws.set_tls_init_handler(websocketpp::lib::bind(&on_tls_init, ::_1));
}
void set_ws_tls_handler(wsclient &ws){}
template <class WST>
void StartWSServer(WST &ws)
{
//wsserver ws;
try {
// Set logging settings
//ws.set_access_channels(websocketpp::log::alevel::all);
//ws.clear_access_channels(websocketpp::log::alevel::frame_payload);
ws.set_access_channels(websocketpp::log::alevel::none);
ws.clear_access_channels(websocketpp::log::alevel::all);
// Initialize Asio
ws.init_asio();
// Register our message handler
ws.set_message_handler(websocketpp::lib::bind(&on_message_server<WST>, &ws, ::_1, ::_2));
ws.set_open_handler(websocketpp::lib::bind(&on_connection<WST>, &ws, ::_1));
ws.set_close_handler(websocketpp::lib::bind(&on_close<WST>, &ws, ::_1));
set_ws_tls_handler(ws);
// Listen
ws.listen(GetArg("-wsport", GetArg("-rpcport", Params().RPCPort()) + 1000));
// Start the server accept loop
ws.start_accept();
// Start the ASIO io_service run loop
ws.run();
}
catch (websocketpp::exception const & e)
{
std::cout << e.what() << std::endl;
}
catch (...)
{
std::cout << "other exception" << std::endl;
}
}
#endif // ENABLE_WS
void StartRPCThreads()
{
const bool fUseSSL = GetBoolArg("-rpcssl", false);
#ifdef ENABLE_WS
if (GetBoolArg("-websocket", false))
{
if (fUseSSL)
boost::thread wst(boost::bind(&StartWSServer<swsserver>, boost::ref<swsserver>(swss)));
else
boost::thread wst(boost::bind(&StartWSServer<wsserver>, boost::ref<wsserver>(wss)));
}
#endif // ENABLE_WS
strRPCUserColonPass = mapArgs["-rpcuser"] + ":" + mapArgs["-rpcpassword"];
if (((mapArgs["-rpcpassword"] == "") ||
(mapArgs["-rpcuser"] == mapArgs["-rpcpassword"])) && Params().RequireRPCPassword())
@ -796,8 +942,6 @@ void StartRPCThreads() @@ -796,8 +942,6 @@ void StartRPCThreads()
rpc_io_service = new asio::io_service();
rpc_ssl_context = new ssl::context(ssl::context::sslv23);
const bool fUseSSL = GetBoolArg("-rpcssl", false);
if (fUseSSL)
{
rpc_ssl_context->set_options(ssl::context::no_sslv2);
@ -907,7 +1051,7 @@ void RPCRunLater(const std::string& name, boost::function<void(void)> func, int6 @@ -907,7 +1051,7 @@ void RPCRunLater(const std::string& name, boost::function<void(void)> func, int6
boost::shared_ptr<deadline_timer>(new deadline_timer(*rpc_io_service))));
}
deadlineTimers[name]->expires_from_now(posix_time::seconds(nSeconds));
deadlineTimers[name]->async_wait(boost::bind(RPCRunHandler, _1, func));
deadlineTimers[name]->async_wait(boost::bind(RPCRunHandler, boost::placeholders::_1, func));
}
@ -1175,9 +1319,156 @@ json_spirit::Value CRPCTable::execute(const std::string &strMethod, const json_s @@ -1175,9 +1319,156 @@ json_spirit::Value CRPCTable::execute(const std::string &strMethod, const json_s
}
}
#ifdef ENABLE_WS
template <class WST>
void on_message_client(WST* c, websocketpp::connection_hdl hdl, message_ptr msg)
{
string strPrint;
int nRet = 0;
try
{
// Parse reply
Value valReply;
if (!read_string(msg->get_payload(), valReply))
throw runtime_error("couldn't parse reply from server");
const Object& reply = valReply.get_obj();
if (reply.empty())
throw runtime_error("expected reply to have result, error and id properties");
// Parse reply
const Value& result = find_value(reply, "result");
const Value& error = find_value(reply, "error");
if (error.type() != null_type)
{
// Error
strPrint = "error: " + write_string(error, false);
int code = find_value(error.get_obj(), "code").get_int();
nRet = abs(code);
}
else
{
// Result
if (result.type() == null_type)
strPrint = "";
else if (result.type() == str_type)
strPrint = result.get_str();
else
strPrint = write_string(result, true);
}
}
catch (boost::thread_interrupted)
{
throw;
}
catch (std::exception& e) {
strPrint = string("error: ") + e.what();
nRet = 87;
}
catch (...)
{
PrintException(NULL, "CommandLineRPC()");
}
if (strPrint != "")
{
fprintf((nRet == 0 ? stdout : stderr), "%s\n", strPrint.c_str());
}
}
template <class WST>
Object StartWSClient(bool fUseSSL)
{
WST c;
stringstream ssuri;
ssuri << "ws" << (fUseSSL ? "s" : "") <<"://" << GetArg("-rpcconnect", "127.0.0.1") << ":" << GetArg("-wsport", GetArg("-rpcport", Params().RPCPort()) + 1000);
try
{
// Set logging to be pretty verbose (everything except message payloads)
//c.set_access_channels(websocketpp::log::alevel::all);
//c.clear_access_channels(websocketpp::log::alevel::frame_payload);
c.set_access_channels(websocketpp::log::alevel::none);
c.clear_access_channels(websocketpp::log::alevel::all);
// Initialize ASIO
c.init_asio();
//c.start_perpetual();
boost::thread wsth(boost::bind(&WST::run, &c));
// Register our message handler
c.set_message_handler(websocketpp::lib::bind(&on_message_client<WST>, &c, ::_1, ::_2));
set_ws_tls_handler(c);
websocketpp::lib::error_code ec;
typename WST::connection_ptr con = c.get_connection(ssuri.str(), ec);
if (ec) {
std::cout << "could not create connection because: " << ec.message() << std::endl;
Object reply;
reply.push_back(Pair("error", ec.message()));
return reply;
}
// Note that connect here only requests a connection. No network messages are
// exchanged until the event loop starts running in the next line.
c.connect(con);
// Start the ASIO io_service run loop
// this will cause a single connection to be made to the server. c.run()
// will exit when this connection is closed.
//c.run();
string strCommand;
while (strCommand != "closewebsocket" && strCommand != "stop")
{
getline(cin, strCommand);
vector<string> tokens;
boost::algorithm::split(tokens, strCommand, boost::algorithm::is_any_of(" "));
// Method
if (tokens.size() < 1)
continue;
vector<string>::iterator it = tokens.begin();
string method = *it;
// Parameters default to strings
vector<string> strParams(++it, tokens.end());
Array params = RPCConvertValues(method, strParams);
string strRequest = JSONRPCRequest(method, params, 1);
c.send(con->get_handle(), strRequest, websocketpp::frame::opcode::text);
}
}
catch (websocketpp::exception const & e)
{
std::cout << e.what() << std::endl;
Object reply;
reply.push_back(Pair("error", e.what()));
return reply;
}
Object reply;
reply.push_back(Pair("result", "OK"));
return reply;
}
#endif // ENABLE_WS
Object CallRPC(const string& strMethod, const Array& params)
{
bool fUseSSL = GetBoolArg("-rpcssl", false);
#ifdef ENABLE_WS
if (strMethod == "openwebsocket")
{
if (fUseSSL)
return StartWSClient<swsclient>(fUseSSL);
else
return StartWSClient<wsclient>(fUseSSL);
}
#endif // ENABLE_WS
if (mapArgs["-rpcuser"] == "" && mapArgs["-rpcpassword"] == "")
throw runtime_error(strprintf(
_("You must set rpcpassword=<password> in the configuration file:\n%s\n"
@ -1185,7 +1476,6 @@ Object CallRPC(const string& strMethod, const Array& params) @@ -1185,7 +1476,6 @@ Object CallRPC(const string& strMethod, const Array& params)
GetConfigFile().string().c_str()));
// Connect to localhost
bool fUseSSL = GetBoolArg("-rpcssl", false);
asio::io_service io_service;
ssl::context context(ssl::context::sslv23);
context.set_options(ssl::context::no_sslv2);
@ -1445,8 +1735,97 @@ int CommandLineRPC(int argc, char *argv[]) @@ -1445,8 +1735,97 @@ int CommandLineRPC(int argc, char *argv[])
return nRet;
}
#ifdef ENABLE_WS
template <class WST>
void on_message_server(WST* s, websocketpp::connection_hdl hdl, message_ptr msg)
{
/*
std::cout << "on_message called with hdl: " << hdl.lock().get()
<< " and message: " << msg->get_payload()
<< std::endl;
*/
// check for a special command to instruct the server to stop listening so
// it can be cleanly exited.
if (msg->get_payload() == "closewebsocket")
{
s->stop_listening();
return;
}
JSONRequest jreq;
string strReply;
try
{
// Parse request
Value valRequest;
if (!read_string(msg->get_payload(), valRequest))
throw JSONRPCError(RPC_PARSE_ERROR, "Parse error");
// singleton request
if (valRequest.type() == obj_type)
{
jreq.parse(valRequest);
Value result = tableRPC.execute(jreq.strMethod, jreq.params);
// Send reply
strReply = JSONRPCReply(result, Value::null, jreq.id);
if (jreq.strMethod == "stop")
s->stop_listening();
}
else if (valRequest.type() == array_type)
strReply = JSONRPCExecBatch(valRequest.get_array());
else
throw JSONRPCError(RPC_PARSE_ERROR, "Top-level object parse error");
}
catch (Object& objError)
{
strReply = JSONRPCReply(Value::null, objError, jreq.id);
}
catch (std::exception& e)
{
strReply = e.what();
}
try
{
s->send(hdl, strReply, msg->get_opcode());
}
catch (const websocketpp::lib::error_code& e)
{
std::cout << "Echo failed because: " << e
<< "(" << e.message() << ")" << std::endl;
}
}
void WriteToWS(Value const& val)
{
string strPrint;
bool fUseSSL = GetBoolArg("-rpcssl", false);
if (val.type() == null_type)
return;
else if (val.type() == str_type)
strPrint = val.get_str();
else
strPrint = write_string(val, false);
for (vector<websocketpp::connection_hdl>::iterator it = wsconnections.begin(); it != wsconnections.end(); ++it)
{
websocketpp::lib::error_code ec;
if (fUseSSL)
swss.send(*it, strPrint, websocketpp::frame::opcode::text, ec);
else
wss.send(*it, strPrint, websocketpp::frame::opcode::text, ec);
if (ec)
{
cout << "ERROR : " << ec.message() << endl << ec << endl;
}
}
}
#endif // ENABLE_WS
#ifdef TEST
int main(int argc, char *argv[])

4
src/bitcoinrpc.h

@ -74,6 +74,10 @@ void StartRPCThreads(); @@ -74,6 +74,10 @@ void StartRPCThreads();
void StopRPCThreads();
int CommandLineRPC(int argc, char *argv[]);
#ifdef ENABLE_WS
void WriteToWS(json_spirit::Value const& val);
#endif // ENABLE_WS
/** Convert parameter values for RPC call from strings to command-specific JSON objects. */
json_spirit::Array RPCConvertValues(const std::string &strMethod, const std::vector<std::string> &strParams);

8
src/init.cpp

@ -238,6 +238,10 @@ std::string HelpMessage() @@ -238,6 +238,10 @@ std::string HelpMessage()
if (!fHaveGUI)
strUsage += " -rpcconnect=<ip> " + _("Send commands to node running on <ip> (default: 127.0.0.1)") + "\n";
strUsage += " -rpcthreads=<n> " + _("Set the number of threads to service RPC calls (default: 10)") + "\n";
#ifdef ENABLE_WS
strUsage += " -websocket " + _("Enables WEB Socket connections (default: 0)") + "\n";
strUsage += " -wsport=<port> " + _("Listen for WEB Socket on <port> (default: rpcport+1000)") + "\n";
#endif // ENABLE_WS
strUsage += " -public_server_mode " + _("Limit JSON-RPC execution to public/safe commands only.") + "\n";
strUsage += " -blocknotify=<cmd> " + _("Execute command when the best block changes (%s in cmd is replaced by block hash)") + "\n";
strUsage += " -walletnotify=<cmd> " + _("Execute command when a wallet transaction changes (%s in cmd is replaced by TxID)") + "\n";
@ -259,7 +263,11 @@ std::string HelpMessage() @@ -259,7 +263,11 @@ std::string HelpMessage()
strUsage += " -blockprioritysize=<n> " + _("Set maximum size of high-priority/low-fee transactions in bytes (default: 27000)") + "\n";
strUsage += "\n"; _("SSL options: (see the Bitcoin Wiki for SSL setup instructions)") + "\n";
#ifdef ENABLE_WS
strUsage += " -rpcssl " + _("Use OpenSSL (https) for JSON-RPC and/or WEB Socket connections") + "\n";
#else
strUsage += " -rpcssl " + _("Use OpenSSL (https) for JSON-RPC connections") + "\n";
#endif // ENABLE_WS
strUsage += " -rpcsslcertificatechainfile=<file.cert> " + _("Server certificate file (default: server.cert)") + "\n";
strUsage += " -rpcsslprivatekeyfile=<file.pem> " + _("Server private key (default: server.pem)") + "\n";
strUsage += " -rpcsslciphers=<ciphers> " + _("Acceptable ciphers (default: TLSv1+HIGH:!SSLv2:!aNULL:!eNULL:!AH:!3DES:@STRENGTH)") + "\n";

61
src/twister.cpp

@ -1440,6 +1440,23 @@ bool processReceivedDM(lazy_entry const* post) @@ -1440,6 +1440,23 @@ bool processReceivedDM(lazy_entry const* post)
} else {
storeNewDM(item.second.username, fromMe ? to : from, stoDM);
}
#ifdef ENABLE_WS
if (GetBoolArg("-websocket", false) && !fromMe)
{
Object dm;
dm.push_back(Pair("type", isGroup ? "GROUP" : "DM"));
if (isGroup)
dm.push_back(Pair("group", item.second.username));
dm.push_back(Pair("k", k));
dm.push_back(Pair("time", utcTime));
dm.push_back(Pair("from", from));
dm.push_back(Pair("to", to));
dm.push_back(Pair("msg", msg));
WriteToWS(dm);
}
#endif // ENABLE_WS
break;
}
}
@ -1474,17 +1491,57 @@ void processReceivedPost(lazy_entry const &v, std::string &username, int64 time, @@ -1474,17 +1491,57 @@ void processReceivedPost(lazy_entry const &v, std::string &username, int64 time,
LOCK(cs_twister);
// mention of a local user && sent by someone we follow
if( m_users.count(mentionUser) && m_users[mentionUser].m_following.count(username) ) {
if( m_users.count(mentionUser) && m_users[mentionUser].m_following.count(username) )
{
std::string postKey = username + ";" + boost::lexical_cast<std::string>(time);
if( m_users[mentionUser].m_mentionsKeys.count(postKey) == 0 ) {
if( m_users[mentionUser].m_mentionsKeys.count(postKey) == 0 )
{
m_users[mentionUser].m_mentionsKeys.insert(postKey);
entry vEntry;
vEntry = v;
m_users[mentionUser].m_mentionsPosts.push_back(vEntry);
#ifdef ENABLE_WS
if (GetBoolArg("-websocket", false))
{
Object obj;
obj.push_back(Pair("type", "mention"));
obj.push_back(Pair("from", username));
obj.push_back(Pair("to", mentionUser));
hexcapePost(vEntry);
obj.push_back(Pair("post", entryToJson(vEntry)));
WriteToWS(obj);
}
#endif // ENABLE_WS
}
}
}
}
#ifdef ENABLE_WS
if (GetBoolArg("-websocket", false))
{
entry vEntry;
vEntry = v;
for (map<string, UserData>::const_iterator it = m_users.begin();
it != m_users.end();
++it)
{
if (it->second.m_following.count(username))
{
Object obj;
obj.push_back(Pair("type", "post"));
obj.push_back(Pair("postboard", it->first));
obj.push_back(Pair("from", username));
hexcapePost(vEntry);
obj.push_back(Pair("post", entryToJson(vEntry)));
WriteToWS(obj);
}
}
}
#endif // ENABLE_WS
}
bool acceptSignedPost(char const *data, int data_size, std::string username, int seq, std::string &errmsg, boost::uint32_t *flags)

1
websocketpp

@ -0,0 +1 @@ @@ -0,0 +1 @@
Subproject commit 378437aecdcb1dfe62096ffd5d944bf1f640ccc3
Loading…
Cancel
Save