From e9c5b31f13b645b732692d9e3d526d1f77cf9f09 Mon Sep 17 00:00:00 2001 From: Intel Date: Mon, 5 May 2014 09:24:10 -0400 Subject: [PATCH 1/3] Enable Multithreading --- src/server/poolserver/Main.cpp | 5 +- src/server/poolserver/Server/Server.cpp | 175 ++++++------------------ src/server/poolserver/Server/Server.h | 26 ++-- 3 files changed, 59 insertions(+), 147 deletions(-) diff --git a/src/server/poolserver/Main.cpp b/src/server/poolserver/Main.cpp index 8f9b3c1..8b60fde 100644 --- a/src/server/poolserver/Main.cpp +++ b/src/server/poolserver/Main.cpp @@ -33,7 +33,7 @@ bool InitConfig(int argc, char *argv[]) // Server descServer.add_options() - ("MinDiffTime", boost::program_options::value()->default_value(100), "Minimum server diff time") + ("ServerThreads", boost::program_options::value()->default_value(2), "How many threads to use") ("MiningAddress", boost::program_options::value()->default_value("n1w8gkPXdNGb6edm4vujBn71A72eQFCNCw"), "Address to send coins to") ("BitcoinRPC", boost::program_options::value >()->multitoken(), "Bitcoin RPC login credentials") ; @@ -106,8 +106,7 @@ int main(int argc, char *argv[]) sLog.OpenLogFile(sConfig.Get("LogFilePath")); sLog.Info(LOG_GENERAL, "LogFile Started: %s", sLog.logFileLoc.c_str()); - boost::asio::io_service io; - Server* server = new Server(io); + Server* server = new Server(); int exitcode = server->Run(); delete server; diff --git a/src/server/poolserver/Server/Server.cpp b/src/server/poolserver/Server/Server.cpp index 9fd208f..2d82808 100644 --- a/src/server/poolserver/Server/Server.cpp +++ b/src/server/poolserver/Server/Server.cpp @@ -15,82 +15,31 @@ #include #include -Server::Server(asio::io_service& io) : serverLoops(0), io_service(io) +Server::Server() { + _io_service = boost::shared_ptr(new asio::io_service()); } Server::~Server() { - //delete stratumServer; + delete _stratumServer; + } int Server::Run() { sLog.Info(LOG_SERVER, "Server is starting..."); + // Connect to database InitDatabase(); - /*std::vector shares; - - sLog.Info(LOG_SERVER, "Loading shares..."); - - MySQL::QueryResult result = sDatabase.Query("SELECT MIN(`id`), MAX(`id`) FROM `shares`"); - MySQL::Field* fields = result->FetchRow(); - uint32 min = fields[0].Get(); - uint32 max = fields[1].Get(); - sLog.Info(LOG_SERVER, "Min: %u Max: %u", min, max); - - MySQL::PreparedStatement* stmt = sDatabase.GetPreparedStatement(STMT_QUERY_SHARES); - for (uint32 i = min; i <= max; i += 500000) - { - stmt->SetUInt32(0, i); - MySQL::QueryResult result2 = sDatabase.Query(stmt); - while (MySQL::Field* fields2 = result2->FetchRow()) { - shares.push_back(Share(fields2[0].Get(), fields2[1].Get(), 2981, fields2[2].Get())); - } - sLog.Info(LOG_SERVER, "Shares: %u", shares.size()); - } - - sLog.Info(LOG_SERVER, "Loaded %u shares", shares.size());*/ - - /*std::vector test = Util::ASCIIToBin("4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b"); - sLog.Info(LOG_SERVER, "Hash: %s", Util::BinToASCII(Crypto::SHA256D(test)).c_str()); - sLog.Info(LOG_SERVER, "RevHash: %s", Util::BinToASCII(Crypto::SHA256D(Util::Reverse(test))).c_str()); - - Bitcoin::Block block; - - ByteBuffer buf(Util::ASCIIToBin("01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000")); - Bitcoin::Transaction trans; - buf >> trans; - sLog.Info(LOG_SERVER, "Version: %u", trans.version); - sLog.Info(LOG_SERVER, "Inputs: %u", trans.in.size()); - sLog.Info(LOG_SERVER, "PrevOut: %s", Util::BinToASCII(trans.in[0].prevout.hash).c_str()); - sLog.Info(LOG_SERVER, "PrevOutn: %u", trans.in[0].prevout.n); - sLog.Info(LOG_SERVER, "ScriptSig: %s", Util::BinToASCII(trans.in[0].script.script).c_str()); - sLog.Info(LOG_SERVER, "Inn: %u", trans.in[0].n); - sLog.Info(LOG_SERVER, "Outputs: %u", trans.out.size()); - sLog.Info(LOG_SERVER, "Value: %i", trans.out[0].value); - sLog.Info(LOG_SERVER, "PubSig: %s", Util::BinToASCII(trans.out[0].scriptPubKey.script).c_str()); - sLog.Info(LOG_SERVER, "LockTime: %u", trans.lockTime); - block.tx.resize(1); - block.tx[0] = trans; - block.BuildMerkleTree(); - sLog.Info(LOG_SERVER, "Hash: %s", Util::BinToASCII(block.merkleRootHash).c_str());*/ - - /*ByteBuffer buf(Util::ASCIIToBin("01000000010c432f4fb3e871a8bda638350b3d5c698cf431db8d6031b53e3fb5159e59d4a90000000000ffffffff0100f2052a010000001976a9143744841e13b90b4aca16fe793a7f88da3a23cc7188ac00000000")); + // Initialize share storage + DataMgr::Initialize(*_io_service); - Bitcoin::Transaction trans; - buf >> trans; - - ByteBuffer buf2; - buf2 << trans; - - sLog.Info(LOG_SERVER, "Trans: %s", Util::BinToASCII(buf2.vec).c_str());*/ - - - DataMgr::Initialize(io_service); - NetworkMgr::Initialize(io_service); + // Initialize bitcoin daemons + NetworkMgr::Initialize(*_io_service); + // Connect to bitcoin rpc std::vector btcrpc = sConfig.Get >("BitcoinRPC"); for (int i = 0; i < btcrpc.size(); ++i) { std::vector params = Util::Explode(btcrpc[i], ";"); @@ -107,87 +56,24 @@ int Server::Run() NetworkMgr::Instance()->Connect(coninfo); } - Stratum::Server srv(io_service); + // Init stratum + _stratumServer = new Stratum::Server(*_io_service); // Start stratum server tcp::endpoint endpoint(tcp::v4(), sConfig.Get("StratumPort")); - srv.Start(endpoint); - - io_service.run(); - - - //sDatabase.Execute("INSERT INTO `test_table` VALUES ('999', 'sync', '1.1')"); - //sDatabase.ExecuteAsync("INSERT INTO `test_table` VALUES ('999', 'sync', '1.1')"); - - /*MySQL::PreparedStatement* stmt = sDatabase.GetPreparedStatement(STMT_INSERT_SHIT); - stmt->SetUInt32(0, 10); - stmt->SetString(1, "hello"); - stmt->SetFloat(2, 5.987); - sDatabase.ExecuteAsync(stmt);*/ - - //MySQL::PreparedStatement* stmt = sDatabase.GetPreparedStatement(STMT_QUERY_TEST_TABLE); - //MySQL::QueryResult result = sDatabase.Query(stmt); + _stratumServer->Start(endpoint); + for (uint32 i = 0; i < sConfig.Get("ServerThreads"); ++i) { + _workerThreads.create_thread(boost::bind(&Server::WorkerThread, this, _io_service)); + } - //sDatabase.QueryAsync("SELECT * FROM `test_table`", &AsyncQueryCallback); - //MySQL::QueryResult result = sDatabase.Query("SELECT * FROM `test_table`"); - /*if (result) { - sLog.Info(LOG_SERVER, "Metadata: F: %u R: %u", result->GetFieldCount(), result->GetRowCount()); - while (MySQL::Field* fields = result->FetchRow()) { - sLog.Info(LOG_SERVER, "Row: %i %s", fields[0].GetUInt32(), fields[1].GetString().c_str()); - } - } else - sLog.Info(LOG_SERVER, "Empty result");*/ - - // Start stratum server - sLog.Info(LOG_SERVER, "Starting stratum"); - //stratumServer = new Stratum::Server(Config::GetString("STRATUM_IP"), Config::GetInt("STRATUM_PORT")); - - // Init loop vars - uint32_t sleepDuration = 0; - int exitcode = 0; - running = true; - - // Init diff - uint32_t minDiffTime = sConfig.Get("MinDiffTime"); - diffStart = boost::chrono::steady_clock::now(); - - sLog.Info(LOG_SERVER, "Server is running!"); - - while (running) - { - // Calc time diff - boost::chrono::steady_clock::time_point now = boost::chrono::steady_clock::now(); - uint32_t diff = boost::chrono::duration_cast(now - diffStart).count(); - diffStart = now; - - // Update - Update(diff); - - // Mercy for CPU - if (diff < minDiffTime+sleepDuration) { - sleepDuration = minDiffTime - diff + sleepDuration; - boost::this_thread::sleep(boost::posix_time::milliseconds(sleepDuration)); - } else - sleepDuration = 0; - - ++serverLoops; - - if (serverLoops > 50) - running = false; - //std::cout << "Diff: " << diff << ", Loop: " << serverLoops << std::endl; - } + _workerThreads.join_all(); sLog.Info(LOG_SERVER, "Server is stopping..."); sDatabase.Close(); - return exitcode; -} - -void Server::Update(uint32_t diff) -{ - + return 0; } bool Server::InitDatabase() @@ -206,3 +92,28 @@ bool Server::InitDatabase() } } +void Server::WorkerThread(boost::shared_ptr io_service) +{ + std::stringstream threadid; + threadid << boost::this_thread::get_id(); + + sLog.Info(LOG_SERVER, "Main server thread #%s started.", threadid.str().c_str()); + + while (true) + { + try + { + boost::system::error_code ec; + _io_service->run(ec); + if (ec) { + sLog.Error(LOG_SERVER, "IO error caught in main server thread #%s: %s.", threadid.str().c_str(), ec.message().c_str()); + } + break; + } catch(std::exception& e) { + sLog.Error(LOG_SERVER, "Exception caught in main server thread #%s: %s.", threadid.str().c_str(), e.what()); + } + } + + sLog.Info(LOG_SERVER, "Main server thread #%s stopped.", threadid.str().c_str()); +} + diff --git a/src/server/poolserver/Server/Server.h b/src/server/poolserver/Server/Server.h index 8965ad0..d8fba30 100644 --- a/src/server/poolserver/Server/Server.h +++ b/src/server/poolserver/Server/Server.h @@ -6,28 +6,30 @@ #include #include #include +#include #define SERVER_MIN_DIFF 100 class Server { public: - Server(asio::io_service& io); + Server(); ~Server(); - - Stratum::Server* stratumServer; - - void UploadShares(const boost::system::error_code& e); - boost::chrono::steady_clock::time_point diffStart; - bool running; - uint64_t serverLoops; - - int Run(); - void Update(uint32_t); + int Run(); bool InitDatabase(); - asio::io_service& io_service; + void WorkerThread(boost::shared_ptr io_service); + +private: + // Protocol servers + Stratum::Server* _stratumServer; + + // Main io service + boost::shared_ptr _io_service; + + // Main thread group + boost::thread_group _workerThreads; }; #endif From e22760e4db85c1f77d092eb57f2a49c7cb1b7e78 Mon Sep 17 00:00:00 2001 From: Intel Date: Mon, 5 May 2014 09:25:17 -0400 Subject: [PATCH 2/3] Fix packet handling --- src/server/poolserver/Stratum/Client.cpp | 25 ++++++++++++++---------- src/server/poolserver/Stratum/Client.h | 4 +++- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/src/server/poolserver/Stratum/Client.cpp b/src/server/poolserver/Stratum/Client.cpp index 8a8f1ae..f217d4f 100644 --- a/src/server/poolserver/Stratum/Client.cpp +++ b/src/server/poolserver/Stratum/Client.cpp @@ -349,29 +349,34 @@ namespace Stratum { if (!error) { std::istream is(&_recvBuffer); - std::stringstream iss; - iss << is.rdbuf(); - try { - OnMessage(JSON::FromString(iss.str())); - } catch (std::exception& e) { - sLog.Error(LOG_SERVER, "Exception caught while parsing json: %s", e.what()); + char c; + while (is.good()) { + is >> c; + if (c == '\n') { + try { + OnMessage(JSON::FromString(_recvMessage)); + } catch (std::exception& e) { + sLog.Error(LOG_SERVER, "Exception caught while parsing json: %s", e.what()); + } + _recvMessage.clear(); + _recvMessage.reserve(PACKET_ALLOC); + } else + _recvMessage += c; } StartRead(); } else { // Client disconnected if ((error == asio::error::eof) || (error == asio::error::connection_reset)) { - _server->Disconnect(shared_from_this()); + Disconnect(); } } } void Client::_OnSend(const boost::system::error_code& error) { - if (!error) { - // Party - } else { + if (error) { // Client disconnected if ((error == asio::error::eof) || (error == asio::error::connection_reset)) { Disconnect(); diff --git a/src/server/poolserver/Stratum/Client.h b/src/server/poolserver/Stratum/Client.h index a2da229..9d4c274 100644 --- a/src/server/poolserver/Stratum/Client.h +++ b/src/server/poolserver/Stratum/Client.h @@ -15,7 +15,7 @@ #include #include -#define MAX_PACKET 2048 +#define PACKET_ALLOC 128 using namespace boost; using namespace boost::asio::ip; @@ -31,6 +31,7 @@ namespace Stratum { _diff = sConfig.Get("StratumMinDifficulty"); _minDiff = _diff; + _recvMessage.reserve(PACKET_ALLOC); } tcp::socket& GetSocket() @@ -130,6 +131,7 @@ namespace Stratum private: // Networking asio::streambuf _recvBuffer; + std::string _recvMessage; tcp::socket _socket; uint32 _ip; uint64 _id; From c06c4edb2adf5f8009ddbc90b2b02d6704207558 Mon Sep 17 00:00:00 2001 From: Intel Date: Mon, 5 May 2014 10:08:33 -0400 Subject: [PATCH 3/3] Fixed socket reading --- src/server/poolserver/Stratum/Client.cpp | 5 ++--- src/server/poolserver/Stratum/Client.h | 9 +++++++-- src/server/poolserver/Stratum/Server.h | 3 ++- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/server/poolserver/Stratum/Client.cpp b/src/server/poolserver/Stratum/Client.cpp index f217d4f..4029dc2 100644 --- a/src/server/poolserver/Stratum/Client.cpp +++ b/src/server/poolserver/Stratum/Client.cpp @@ -17,6 +17,7 @@ namespace Stratum _ip = remote_ad.to_v4().to_ulong(); if (_server->IsBanned(_ip)) { + sLog.Warn(LOG_STRATUM, "Blocked banned client from: %s", remote_ad.to_v4().to_string().c_str()); Disconnect(); return; } @@ -349,10 +350,8 @@ namespace Stratum { if (!error) { std::istream is(&_recvBuffer); - char c; - while (is.good()) { - is >> c; + while (is.get(c)) { if (c == '\n') { try { OnMessage(JSON::FromString(_recvMessage)); diff --git a/src/server/poolserver/Stratum/Client.h b/src/server/poolserver/Stratum/Client.h index 9d4c274..00b7c6a 100644 --- a/src/server/poolserver/Stratum/Client.h +++ b/src/server/poolserver/Stratum/Client.h @@ -45,10 +45,10 @@ namespace Stratum void StartRead() { // Read until newline - boost::asio::async_read_until( + asio::async_read( _socket, _recvBuffer, - '\n', + asio::transfer_at_least(1), boost::bind(&Client::_OnReceive, this, asio::placeholders::error, asio::placeholders::bytes_transferred)); } @@ -117,6 +117,11 @@ namespace Stratum return _id; } + uint32 GetIP() + { + return _ip; + } + void Ban(uint32 time); void Disconnect(); diff --git a/src/server/poolserver/Stratum/Server.h b/src/server/poolserver/Stratum/Server.h index 3fdfa63..a8a8917 100644 --- a/src/server/poolserver/Stratum/Server.h +++ b/src/server/poolserver/Stratum/Server.h @@ -100,6 +100,7 @@ namespace Stratum { client->CloseSocket(); _clients.erase(client); + sLog.Debug(LOG_STRATUM, "Stratum client disconnected from %s. Total clients: %u", asio::ip::address_v4(client->GetIP()).to_string().c_str(), _clients.size()); } void Ban(uint32 ip, uint64 time) @@ -133,9 +134,9 @@ namespace Stratum void _OnAccept(ClientPtr client, const boost::system::error_code& error) { if (!error) { - sLog.Debug(LOG_STRATUM, "New stratum client accepted. Total clients: %u", _clients.size()); client->Start(); _clients.insert(client); + sLog.Debug(LOG_STRATUM, "New stratum client accepted from %s. Total clients: %u", asio::ip::address_v4(client->GetIP()).to_string().c_str(), _clients.size()); } else { sLog.Debug(LOG_STRATUM, "Failed to accept stratum client"); }