From 0fccd298c6e690af87f18448fc2546bd33ad6e3b Mon Sep 17 00:00:00 2001 From: Intel Date: Sun, 4 May 2014 16:35:37 -0400 Subject: [PATCH] More work --- src/server/poolserver/DataMgr/DataMgr.cpp | 10 ++-- src/server/poolserver/DataMgr/DataMgr.h | 50 ++++++++++++++--- .../poolserver/NetworkMgr/NetworkMgr.cpp | 6 +- src/server/poolserver/Server/Server.cpp | 21 +------ src/server/poolserver/Server/Server.h | 1 - src/server/poolserver/Stratum/Client.cpp | 56 ++++++++++++++----- src/server/poolserver/Stratum/Client.h | 3 +- .../poolserver/Stratum/ShareLimiter.cpp | 17 +++++- src/server/poolserver/Stratum/ShareLimiter.h | 14 ++--- src/server/shared/MySQL/DatabaseConnection.h | 9 +++ src/server/shared/MySQL/DatabaseOperation.h | 13 ++++- src/server/shared/MySQL/DatabaseWorkerPool.h | 8 +++ src/server/shared/Util.cpp | 9 +-- src/server/shared/Util.h | 2 +- 14 files changed, 145 insertions(+), 74 deletions(-) diff --git a/src/server/poolserver/DataMgr/DataMgr.cpp b/src/server/poolserver/DataMgr/DataMgr.cpp index e1b7279..5d07b32 100644 --- a/src/server/poolserver/DataMgr/DataMgr.cpp +++ b/src/server/poolserver/DataMgr/DataMgr.cpp @@ -3,8 +3,9 @@ #include "Util.h" #include "Log.h" -template<> -void DataMgr::Upload() +DataMgr* DataMgr::singleton = 0; + +void DataMgr::Upload() { sLog.Info(LOG_SERVER, "We have %u shares", Size()); @@ -15,8 +16,6 @@ void DataMgr::Upload() std::string query("INSERT INTO `shares` (`rem_host`, `username`, `our_result`, `upstream_result`, `reason`, `time`, `difficulty`) VALUES "); for (int i = 0; i < BULK_COUNT; ++i) { - sLog.Info(LOG_SERVER, "Query: %s", query.c_str()); - Share share = Pop(); query += Util::FS("(INET_NTOA(%u), '%s', %u, 0, '%s', FROM_UNIXTIME(%u), %u)", share.host, share.username.c_str(), share.result, share.reason.c_str(), share.time, share.diff); @@ -28,9 +27,8 @@ void DataMgr::Upload() query += ','; } + sLog.Debug(LOG_SERVER, "Query: %s", query.c_str()); sDatabase.ExecuteAsync(query.c_str()); } } - -DataMgr sDataMgr; diff --git a/src/server/poolserver/DataMgr/DataMgr.h b/src/server/poolserver/DataMgr/DataMgr.h index dbcc32d..8b93878 100644 --- a/src/server/poolserver/DataMgr/DataMgr.h +++ b/src/server/poolserver/DataMgr/DataMgr.h @@ -3,6 +3,7 @@ #include #include +#include #include "Util.h" #include "Share.h" @@ -10,19 +11,38 @@ #define BULK_MIN 1 #define BULK_COUNT 50 -template +using namespace boost; + class DataMgr { + // Singleton +private: + static DataMgr* singleton; public: - DataMgr() {} - void Push(T data) + static DataMgr* Instance() + { + return singleton; + } + + static void Initialize(asio::io_service& io_service) + { + singleton = new DataMgr(io_service); + } + +public: + DataMgr(asio::io_service& io_service) : _io_service(io_service), _uploadTimer(io_service) + { + UploadTimerStart(); + } + + void Push(Share data) { boost::unique_lock lock(_datamutex); _datastore.push_back(data); } - T Pop() + Share Pop() { boost::unique_lock lock(_datamutex); Share share = _datastore.front(); @@ -36,12 +56,28 @@ public: return _datastore.size(); } + void UploadTimerStart() + { + _uploadTimer.expires_from_now(boost::posix_time::seconds(3)); + _uploadTimer.async_wait(boost::bind(&DataMgr::UploadTimerExpired, this, boost::asio::placeholders::error)); + } + + void UploadTimerExpired(const boost::system::error_code& /*e*/) + { + Upload(); + UploadTimerStart(); + } + void Upload(); private: + // io service + asio::io_service& _io_service; + + // timer + boost::asio::deadline_timer _uploadTimer; + boost::mutex _datamutex; - std::deque _datastore; + std::deque _datastore; }; -extern DataMgr sDataMgr; - #endif diff --git a/src/server/poolserver/NetworkMgr/NetworkMgr.cpp b/src/server/poolserver/NetworkMgr/NetworkMgr.cpp index 9412f13..a40fe21 100644 --- a/src/server/poolserver/NetworkMgr/NetworkMgr.cpp +++ b/src/server/poolserver/NetworkMgr/NetworkMgr.cpp @@ -147,7 +147,7 @@ void NetworkMgr::BlockCheck() void NetworkMgr::BlockCheckTimerStart() { - _blockCheckTimer.expires_from_now(boost::posix_time::seconds(3)); + _blockCheckTimer.expires_from_now(boost::posix_time::seconds(2)); _blockCheckTimer.async_wait(boost::bind(&NetworkMgr::BlockCheckTimerExpired, this, boost::asio::placeholders::error)); } @@ -161,12 +161,14 @@ void NetworkMgr::BlockCheckTimerExpired(const boost::system::error_code& /*e*/) void NetworkMgr::BlockNotifyBind(FBlockNotify f) { _blockNotifyBinds.push_back(f); + // Send block template + f(_curBlockTmpl, true); } // Block notify timer void NetworkMgr::BlockNotifyTimerStart() { - _blockNotifyTimer.expires_from_now(boost::posix_time::seconds(3)); + _blockNotifyTimer.expires_from_now(boost::posix_time::seconds(30)); _blockNotifyTimer.async_wait(boost::bind(&NetworkMgr::BlockNotifyTimerExpired, this, boost::asio::placeholders::error)); } diff --git a/src/server/poolserver/Server/Server.cpp b/src/server/poolserver/Server/Server.cpp index f20a44c..9fd208f 100644 --- a/src/server/poolserver/Server/Server.cpp +++ b/src/server/poolserver/Server/Server.cpp @@ -15,7 +15,7 @@ #include #include -Server::Server(asio::io_service& io) : serverLoops(0), io_service(io), uploadtimer(io) +Server::Server(asio::io_service& io) : serverLoops(0), io_service(io) { } @@ -24,19 +24,6 @@ Server::~Server() //delete stratumServer; } -void AsyncQueryCallback(MySQL::QueryResult result) -{ -} - - - -void Server::UploadShares(const boost::system::error_code& /*e*/) -{ - sDataMgr.Upload(); - uploadtimer.expires_from_now(boost::posix_time::seconds(3)); //repeat rate here - uploadtimer.async_wait(boost::bind(&Server::UploadShares, this, boost::asio::placeholders::error)); -} - int Server::Run() { sLog.Info(LOG_SERVER, "Server is starting..."); @@ -101,7 +88,7 @@ int Server::Run() sLog.Info(LOG_SERVER, "Trans: %s", Util::BinToASCII(buf2.vec).c_str());*/ - + DataMgr::Initialize(io_service); NetworkMgr::Initialize(io_service); std::vector btcrpc = sConfig.Get >("BitcoinRPC"); @@ -126,10 +113,6 @@ int Server::Run() tcp::endpoint endpoint(tcp::v4(), sConfig.Get("StratumPort")); srv.Start(endpoint); - // Start timer - uploadtimer.expires_from_now(boost::posix_time::seconds(3)); //repeat rate here - uploadtimer.async_wait(boost::bind(&Server::UploadShares, this, boost::asio::placeholders::error)); - io_service.run(); diff --git a/src/server/poolserver/Server/Server.h b/src/server/poolserver/Server/Server.h index bf270ac..8965ad0 100644 --- a/src/server/poolserver/Server/Server.h +++ b/src/server/poolserver/Server/Server.h @@ -27,7 +27,6 @@ public: bool InitDatabase(); - boost::asio::deadline_timer uploadtimer; asio::io_service& io_service; }; diff --git a/src/server/poolserver/Stratum/Client.cpp b/src/server/poolserver/Stratum/Client.cpp index 3473517..0834209 100644 --- a/src/server/poolserver/Stratum/Client.cpp +++ b/src/server/poolserver/Stratum/Client.cpp @@ -4,6 +4,7 @@ #include "DataMgr.h" #include "ShareLimiter.h" #include "Exception.h" +#include "ServerDatabaseEnv.h" #include namespace Stratum @@ -70,12 +71,28 @@ namespace Stratum return; } + // check username + std::string username = params[0].GetString(); + if (!_workers.count(username)) { + sLog.Error(LOG_SERVER, "Worker not authenticated"); + JSON response; + response["id"] = msg["id"]; + response["result"]; + response["error"].Add(int64(24)); + response["error"].Add("Unauthorized worker"); + response["error"].Add(JSON()); + SendMessage(response); + return; + } + uint32 jobid; ByteBuffer jobbuf(Util::ASCIIToBin(params[1].GetString())); jobbuf >> jobid; // Check if such job exists if (!_jobs.count(jobid)) { + DataMgr::Instance()->Push(Share(_ip, username, false, "Job not found", Util::Date(), 1)); + JSON response; response["id"] = msg["id"]; response["result"]; @@ -86,9 +103,6 @@ namespace Stratum return; } - // check username - std::string username = params[0].GetString(); - BinaryData extranonce2 = Util::ASCIIToBin(params[2].GetString()); if (extranonce2.size() != 4) { sLog.Error(LOG_SERVER, "Wrong extranonce size"); @@ -135,6 +149,8 @@ namespace Stratum share << extranonce2 << timebuf << noncebuf; if (!job.SubmitShare(share.Binary())) { sLog.Error(LOG_SERVER, "Duplicate share"); + DataMgr::Instance()->Push(Share(_ip, username, false, "Duplicate share", Util::Date(), job.diff)); + JSON response; response["id"] = msg["id"]; response["result"]; @@ -161,18 +177,12 @@ namespace Stratum // Set coinbase tx block.tx[0] = coinbasetx; - sLog.Info(LOG_SERVER, "Coinbase hash1: %s", Util::BinToASCII(Crypto::SHA256D(coinbasebuf.Binary())).c_str()); - sLog.Info(LOG_SERVER, "Coinbase hash2: %s", Util::BinToASCII(coinbasetx.GetHash()).c_str()); - // Rebuilds only left side of merkle tree block.RebuildMerkleTree(); - sLog.Info(LOG_SERVER, "Merklehash: %s", Util::BinToASCII(block.merkleRootHash).c_str()); // Get block hash BinaryData hash = block.GetHash(); - sLog.Info(LOG_SERVER, "Block hash: %s", Util::BinToASCII(hash).c_str()); - // Get block target BigInt target(Util::BinToASCII(Util::Reverse(hash)), 16); @@ -187,6 +197,8 @@ namespace Stratum // Check if difficulty meets job diff if (target > Bitcoin::DiffToTarget(job.diff)) { sLog.Error(LOG_SERVER, "Share above target"); + DataMgr::Instance()->Push(Share(_ip, username, false, "Share above target", Util::Date(), job.diff)); + JSON response; response["id"] = msg["id"]; response["result"]; @@ -203,7 +215,7 @@ namespace Stratum _server->SubmitBlock(block); } else { - sDataMgr.Push(Share(_ip, username, true, "", Util::Date(), job.diff)); + DataMgr::Instance()->Push(Share(_ip, username, true, "", Util::Date(), job.diff)); JSON response; response["id"] = msg["id"]; @@ -247,11 +259,25 @@ namespace Stratum std::string username = msg["params"][0].GetString(); std::string password = msg["params"][1].GetString(); - JSON response; - response["id"] = msg["id"].GetInt(); - response["error"]; - response["result"] = true; - SendMessage(response); + MySQL::QueryResult result = sDatabase.Query(Util::FS("SELECT * FROM `pool_worker` WHERE `username` = '%s' and `password` = '%s'", sDatabase.Escape(username).c_str(), sDatabase.Escape(password).c_str()).c_str()); + + if (result) { + _workers.insert(username); + + JSON response; + response["id"] = msg["id"].GetInt(); + response["error"]; + response["result"] = true; + SendMessage(response); + } else { + JSON response; + response["id"] = msg["id"].GetInt(); + response["error"].Add(int64(20)); + response["error"].Add("Authentication failed"); + response["error"].Add(JSON()); + response["result"] = true; + SendMessage(response); + } } Job Client::GetJob() diff --git a/src/server/poolserver/Stratum/Client.h b/src/server/poolserver/Stratum/Client.h index 5136396..d88f33a 100644 --- a/src/server/poolserver/Stratum/Client.h +++ b/src/server/poolserver/Stratum/Client.h @@ -13,6 +13,7 @@ #include #include #include +#include #define MAX_PACKET 2048 @@ -142,7 +143,7 @@ namespace Stratum Stratum::Server* _server; // Authorization - std::vector _workers; + std::set _workers; // Jobs bool _subscribed; diff --git a/src/server/poolserver/Stratum/ShareLimiter.cpp b/src/server/poolserver/Stratum/ShareLimiter.cpp index 567e0c5..184a5e2 100644 --- a/src/server/poolserver/Stratum/ShareLimiter.cpp +++ b/src/server/poolserver/Stratum/ShareLimiter.cpp @@ -1,6 +1,7 @@ #include "ShareLimiter.h" #include "Server.h" #include "Client.h" +#include "Log.h" namespace Stratum { @@ -15,23 +16,33 @@ namespace Stratum if (sinceLast < RETARGET_INTERVAL) return true; + _lastRetarget = curTime; + while (_shares.size() && (_shares.front() < curTime - RETARGET_TIME_BUFFER)) _shares.pop_front(); uint32 interval = std::min(curTime - _startTime, uint64(RETARGET_TIME_BUFFER)); // Calculate shares/min - double speed = (_shares.size()*60) / interval; + double speed = double(_shares.size()*60) / double(interval); // Calculate difference from pool target in % - double variance = (speed - RETARGET_SHARES_PER_MIN) / RETARGET_SHARES_PER_MIN; + double variance = speed / double(RETARGET_SHARES_PER_MIN); + + sLog.Info(LOG_SERVER, "Miner variance: %f speed: %f", variance, speed); // Check if we need to retarget - if (std::abs(variance)*100 < RETARGET_VARIANCE) + if (variance*100 < RETARGET_VARIANCE) return true; uint64 newDiff = double(_client->GetDifficulty()) * variance; + if (newDiff < 1) + newDiff = 1; + + if (newDiff > RETARGET_MAXDIFF) + newDiff = RETARGET_MAXDIFF; + _client->SetDifficulty(newDiff, true); return true; diff --git a/src/server/poolserver/Stratum/ShareLimiter.h b/src/server/poolserver/Stratum/ShareLimiter.h index 6e56da5..b65572e 100644 --- a/src/server/poolserver/Stratum/ShareLimiter.h +++ b/src/server/poolserver/Stratum/ShareLimiter.h @@ -6,29 +6,23 @@ #include -#define RETARGET_INTERVAL 60 +#define RETARGET_INTERVAL 20 #define RETARGET_TIME_BUFFER 60*5 #define RETARGET_SHARES_PER_MIN 15 #define RETARGET_VARIANCE 40 +#define RETARGET_MAXDIFF 1000000 namespace Stratum { class Client; - class ShareLimiterRecord - { - public: - ShareLimiterRecord(uint64 atime, uint64 adiff) : time(atime), diff(adiff) {} - uint64 time; - uint64 diff; - }; - class ShareLimiter { public: - ShareLimiter(Client* client) : _client(client), _lastRetarget(0) + ShareLimiter(Client* client) : _client(client) { _startTime = Util::Date(); + _lastRetarget = _startTime; } bool Submit(); diff --git a/src/server/shared/MySQL/DatabaseConnection.h b/src/server/shared/MySQL/DatabaseConnection.h index f6541ce..a9936b6 100644 --- a/src/server/shared/MySQL/DatabaseConnection.h +++ b/src/server/shared/MySQL/DatabaseConnection.h @@ -82,6 +82,15 @@ namespace MySQL return _stmts[index]; } + std::string Escape(std::string text) + { + char* buf = new char[text.length()*2 + 1]; + mysql_real_escape_string(_mysql, buf, text.c_str(), text.length()); + std::string result(buf); + delete buf; + return result; + } + private: bool _Query(const char *sql, MYSQL_RES** result, MYSQL_FIELD** fields, uint64& rowCount, uint32& fieldCount); bool _Query(PreparedStatement* stmt, MYSQL_RES** result, MYSQL_STMT** resultSTMT, uint32& fieldCount); diff --git a/src/server/shared/MySQL/DatabaseOperation.h b/src/server/shared/MySQL/DatabaseOperation.h index 904de00..ccc953a 100644 --- a/src/server/shared/MySQL/DatabaseOperation.h +++ b/src/server/shared/MySQL/DatabaseOperation.h @@ -40,13 +40,22 @@ namespace MySQL class DatabaseQueryOperation : public DatabaseOperation { public: - DatabaseQueryOperation(const char* query, DatabaseCallback callback = NULL): DatabaseOperation(), _query(query), _callback(callback) {}; + DatabaseQueryOperation(const char* query, DatabaseCallback callback = NULL): DatabaseOperation(), _callback(callback) + { + _query = new char[strlen(query)]; + strcpy(_query, const_cast(query)); + } + + ~DatabaseQueryOperation() + { + delete[] _query; + } void Execute(); private: DatabaseCallback _callback; - const char* _query; + char* _query; }; typedef Util::SynchronisedQueue DatabaseWorkQueue; diff --git a/src/server/shared/MySQL/DatabaseWorkerPool.h b/src/server/shared/MySQL/DatabaseWorkerPool.h index b096043..c8f1dc4 100644 --- a/src/server/shared/MySQL/DatabaseWorkerPool.h +++ b/src/server/shared/MySQL/DatabaseWorkerPool.h @@ -93,6 +93,14 @@ namespace MySQL return new PreparedStatement(stmtid); } + std::string Escape(std::string text) + { + DatabaseConnection* conn = GetSyncConnection(); + std::string result = conn->Escape(text); + conn->Unlock(); + return result; + } + private: DatabaseConnection* GetSyncConnection() { diff --git a/src/server/shared/Util.cpp b/src/server/shared/Util.cpp index bd7b251..24d8f73 100644 --- a/src/server/shared/Util.cpp +++ b/src/server/shared/Util.cpp @@ -18,14 +18,9 @@ std::string Util::Date(const char* format, bool utc) return ss.str(); } -uint32 Util::Date(bool utc) +uint32 Util::Date() { - boost::posix_time::ptime now; - if (utc) - now = boost::posix_time::second_clock::universal_time(); - else - now = boost::posix_time::second_clock::local_time(); - + boost::posix_time::ptime now = boost::posix_time::second_clock::universal_time(); boost::posix_time::ptime epoch(boost::gregorian::date(1970,1,1)); boost::posix_time::time_duration diff = now - epoch; return diff.total_seconds(); diff --git a/src/server/shared/Util.h b/src/server/shared/Util.h index 9dfbee1..1c6a4fe 100644 --- a/src/server/shared/Util.h +++ b/src/server/shared/Util.h @@ -19,7 +19,7 @@ namespace Util { std::string Date(const char* format, bool utc = false); - uint32 Date(bool utc = true); + uint32 Date(); std::string FS(const char *str, ...); std::vector Explode(std::string input, std::string delim);