Browse Source

More work

peercoin
Intel 10 years ago
parent
commit
0fccd298c6
  1. 10
      src/server/poolserver/DataMgr/DataMgr.cpp
  2. 50
      src/server/poolserver/DataMgr/DataMgr.h
  3. 6
      src/server/poolserver/NetworkMgr/NetworkMgr.cpp
  4. 21
      src/server/poolserver/Server/Server.cpp
  5. 1
      src/server/poolserver/Server/Server.h
  6. 56
      src/server/poolserver/Stratum/Client.cpp
  7. 3
      src/server/poolserver/Stratum/Client.h
  8. 17
      src/server/poolserver/Stratum/ShareLimiter.cpp
  9. 14
      src/server/poolserver/Stratum/ShareLimiter.h
  10. 9
      src/server/shared/MySQL/DatabaseConnection.h
  11. 13
      src/server/shared/MySQL/DatabaseOperation.h
  12. 8
      src/server/shared/MySQL/DatabaseWorkerPool.h
  13. 9
      src/server/shared/Util.cpp
  14. 2
      src/server/shared/Util.h

10
src/server/poolserver/DataMgr/DataMgr.cpp

@ -3,8 +3,9 @@ @@ -3,8 +3,9 @@
#include "Util.h"
#include "Log.h"
template<>
void DataMgr<Share>::Upload()
DataMgr* DataMgr::singleton = 0;
void DataMgr::Upload()
{
sLog.Info(LOG_SERVER, "We have %u shares", Size());
@ -15,8 +16,6 @@ void DataMgr<Share>::Upload() @@ -15,8 +16,6 @@ void DataMgr<Share>::Upload()
std::string query("INSERT INTO `shares` (`rem_host`, `username`, `our_result`, `upstream_result`, `reason`, `time`, `difficulty`) VALUES ");
for (int i = 0; i < BULK_COUNT; ++i)
{
sLog.Info(LOG_SERVER, "Query: %s", query.c_str());
Share share = Pop();
query += Util::FS("(INET_NTOA(%u), '%s', %u, 0, '%s', FROM_UNIXTIME(%u), %u)", share.host, share.username.c_str(), share.result, share.reason.c_str(), share.time, share.diff);
@ -28,9 +27,8 @@ void DataMgr<Share>::Upload() @@ -28,9 +27,8 @@ void DataMgr<Share>::Upload()
query += ',';
}
sLog.Debug(LOG_SERVER, "Query: %s", query.c_str());
sDatabase.ExecuteAsync(query.c_str());
}
}
DataMgr<Share> sDataMgr;

50
src/server/poolserver/DataMgr/DataMgr.h

@ -3,6 +3,7 @@ @@ -3,6 +3,7 @@
#include <deque>
#include <boost/thread.hpp>
#include <boost/asio.hpp>
#include "Util.h"
#include "Share.h"
@ -10,19 +11,38 @@ @@ -10,19 +11,38 @@
#define BULK_MIN 1
#define BULK_COUNT 50
template<class T>
using namespace boost;
class DataMgr
{
// Singleton
private:
static DataMgr* singleton;
public:
DataMgr() {}
void Push(T data)
static DataMgr* Instance()
{
return singleton;
}
static void Initialize(asio::io_service& io_service)
{
singleton = new DataMgr(io_service);
}
public:
DataMgr(asio::io_service& io_service) : _io_service(io_service), _uploadTimer(io_service)
{
UploadTimerStart();
}
void Push(Share data)
{
boost::unique_lock<boost::mutex> lock(_datamutex);
_datastore.push_back(data);
}
T Pop()
Share Pop()
{
boost::unique_lock<boost::mutex> lock(_datamutex);
Share share = _datastore.front();
@ -36,12 +56,28 @@ public: @@ -36,12 +56,28 @@ public:
return _datastore.size();
}
void UploadTimerStart()
{
_uploadTimer.expires_from_now(boost::posix_time::seconds(3));
_uploadTimer.async_wait(boost::bind(&DataMgr::UploadTimerExpired, this, boost::asio::placeholders::error));
}
void UploadTimerExpired(const boost::system::error_code& /*e*/)
{
Upload();
UploadTimerStart();
}
void Upload();
private:
// io service
asio::io_service& _io_service;
// timer
boost::asio::deadline_timer _uploadTimer;
boost::mutex _datamutex;
std::deque<T> _datastore;
std::deque<Share> _datastore;
};
extern DataMgr<Share> sDataMgr;
#endif

6
src/server/poolserver/NetworkMgr/NetworkMgr.cpp

@ -147,7 +147,7 @@ void NetworkMgr::BlockCheck() @@ -147,7 +147,7 @@ void NetworkMgr::BlockCheck()
void NetworkMgr::BlockCheckTimerStart()
{
_blockCheckTimer.expires_from_now(boost::posix_time::seconds(3));
_blockCheckTimer.expires_from_now(boost::posix_time::seconds(2));
_blockCheckTimer.async_wait(boost::bind(&NetworkMgr::BlockCheckTimerExpired, this, boost::asio::placeholders::error));
}
@ -161,12 +161,14 @@ void NetworkMgr::BlockCheckTimerExpired(const boost::system::error_code& /*e*/) @@ -161,12 +161,14 @@ void NetworkMgr::BlockCheckTimerExpired(const boost::system::error_code& /*e*/)
void NetworkMgr::BlockNotifyBind(FBlockNotify f)
{
_blockNotifyBinds.push_back(f);
// Send block template
f(_curBlockTmpl, true);
}
// Block notify timer
void NetworkMgr::BlockNotifyTimerStart()
{
_blockNotifyTimer.expires_from_now(boost::posix_time::seconds(3));
_blockNotifyTimer.expires_from_now(boost::posix_time::seconds(30));
_blockNotifyTimer.async_wait(boost::bind(&NetworkMgr::BlockNotifyTimerExpired, this, boost::asio::placeholders::error));
}

21
src/server/poolserver/Server/Server.cpp

@ -15,7 +15,7 @@ @@ -15,7 +15,7 @@
#include <iostream>
#include <algorithm>
Server::Server(asio::io_service& io) : serverLoops(0), io_service(io), uploadtimer(io)
Server::Server(asio::io_service& io) : serverLoops(0), io_service(io)
{
}
@ -24,19 +24,6 @@ Server::~Server() @@ -24,19 +24,6 @@ Server::~Server()
//delete stratumServer;
}
void AsyncQueryCallback(MySQL::QueryResult result)
{
}
void Server::UploadShares(const boost::system::error_code& /*e*/)
{
sDataMgr.Upload();
uploadtimer.expires_from_now(boost::posix_time::seconds(3)); //repeat rate here
uploadtimer.async_wait(boost::bind(&Server::UploadShares, this, boost::asio::placeholders::error));
}
int Server::Run()
{
sLog.Info(LOG_SERVER, "Server is starting...");
@ -101,7 +88,7 @@ int Server::Run() @@ -101,7 +88,7 @@ int Server::Run()
sLog.Info(LOG_SERVER, "Trans: %s", Util::BinToASCII(buf2.vec).c_str());*/
DataMgr::Initialize(io_service);
NetworkMgr::Initialize(io_service);
std::vector<std::string> btcrpc = sConfig.Get<std::vector<std::string> >("BitcoinRPC");
@ -126,10 +113,6 @@ int Server::Run() @@ -126,10 +113,6 @@ int Server::Run()
tcp::endpoint endpoint(tcp::v4(), sConfig.Get<uint16>("StratumPort"));
srv.Start(endpoint);
// Start timer
uploadtimer.expires_from_now(boost::posix_time::seconds(3)); //repeat rate here
uploadtimer.async_wait(boost::bind(&Server::UploadShares, this, boost::asio::placeholders::error));
io_service.run();

1
src/server/poolserver/Server/Server.h

@ -27,7 +27,6 @@ public: @@ -27,7 +27,6 @@ public:
bool InitDatabase();
boost::asio::deadline_timer uploadtimer;
asio::io_service& io_service;
};

56
src/server/poolserver/Stratum/Client.cpp

@ -4,6 +4,7 @@ @@ -4,6 +4,7 @@
#include "DataMgr.h"
#include "ShareLimiter.h"
#include "Exception.h"
#include "ServerDatabaseEnv.h"
#include <iostream>
namespace Stratum
@ -70,12 +71,28 @@ namespace Stratum @@ -70,12 +71,28 @@ namespace Stratum
return;
}
// check username
std::string username = params[0].GetString();
if (!_workers.count(username)) {
sLog.Error(LOG_SERVER, "Worker not authenticated");
JSON response;
response["id"] = msg["id"];
response["result"];
response["error"].Add(int64(24));
response["error"].Add("Unauthorized worker");
response["error"].Add(JSON());
SendMessage(response);
return;
}
uint32 jobid;
ByteBuffer jobbuf(Util::ASCIIToBin(params[1].GetString()));
jobbuf >> jobid;
// Check if such job exists
if (!_jobs.count(jobid)) {
DataMgr::Instance()->Push(Share(_ip, username, false, "Job not found", Util::Date(), 1));
JSON response;
response["id"] = msg["id"];
response["result"];
@ -86,9 +103,6 @@ namespace Stratum @@ -86,9 +103,6 @@ namespace Stratum
return;
}
// check username
std::string username = params[0].GetString();
BinaryData extranonce2 = Util::ASCIIToBin(params[2].GetString());
if (extranonce2.size() != 4) {
sLog.Error(LOG_SERVER, "Wrong extranonce size");
@ -135,6 +149,8 @@ namespace Stratum @@ -135,6 +149,8 @@ namespace Stratum
share << extranonce2 << timebuf << noncebuf;
if (!job.SubmitShare(share.Binary())) {
sLog.Error(LOG_SERVER, "Duplicate share");
DataMgr::Instance()->Push(Share(_ip, username, false, "Duplicate share", Util::Date(), job.diff));
JSON response;
response["id"] = msg["id"];
response["result"];
@ -161,18 +177,12 @@ namespace Stratum @@ -161,18 +177,12 @@ namespace Stratum
// Set coinbase tx
block.tx[0] = coinbasetx;
sLog.Info(LOG_SERVER, "Coinbase hash1: %s", Util::BinToASCII(Crypto::SHA256D(coinbasebuf.Binary())).c_str());
sLog.Info(LOG_SERVER, "Coinbase hash2: %s", Util::BinToASCII(coinbasetx.GetHash()).c_str());
// Rebuilds only left side of merkle tree
block.RebuildMerkleTree();
sLog.Info(LOG_SERVER, "Merklehash: %s", Util::BinToASCII(block.merkleRootHash).c_str());
// Get block hash
BinaryData hash = block.GetHash();
sLog.Info(LOG_SERVER, "Block hash: %s", Util::BinToASCII(hash).c_str());
// Get block target
BigInt target(Util::BinToASCII(Util::Reverse(hash)), 16);
@ -187,6 +197,8 @@ namespace Stratum @@ -187,6 +197,8 @@ namespace Stratum
// Check if difficulty meets job diff
if (target > Bitcoin::DiffToTarget(job.diff)) {
sLog.Error(LOG_SERVER, "Share above target");
DataMgr::Instance()->Push(Share(_ip, username, false, "Share above target", Util::Date(), job.diff));
JSON response;
response["id"] = msg["id"];
response["result"];
@ -203,7 +215,7 @@ namespace Stratum @@ -203,7 +215,7 @@ namespace Stratum
_server->SubmitBlock(block);
} else {
sDataMgr.Push(Share(_ip, username, true, "", Util::Date(), job.diff));
DataMgr::Instance()->Push(Share(_ip, username, true, "", Util::Date(), job.diff));
JSON response;
response["id"] = msg["id"];
@ -247,11 +259,25 @@ namespace Stratum @@ -247,11 +259,25 @@ namespace Stratum
std::string username = msg["params"][0].GetString();
std::string password = msg["params"][1].GetString();
JSON response;
response["id"] = msg["id"].GetInt();
response["error"];
response["result"] = true;
SendMessage(response);
MySQL::QueryResult result = sDatabase.Query(Util::FS("SELECT * FROM `pool_worker` WHERE `username` = '%s' and `password` = '%s'", sDatabase.Escape(username).c_str(), sDatabase.Escape(password).c_str()).c_str());
if (result) {
_workers.insert(username);
JSON response;
response["id"] = msg["id"].GetInt();
response["error"];
response["result"] = true;
SendMessage(response);
} else {
JSON response;
response["id"] = msg["id"].GetInt();
response["error"].Add(int64(20));
response["error"].Add("Authentication failed");
response["error"].Add(JSON());
response["result"] = true;
SendMessage(response);
}
}
Job Client::GetJob()

3
src/server/poolserver/Stratum/Client.h

@ -13,6 +13,7 @@ @@ -13,6 +13,7 @@
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <set>
#define MAX_PACKET 2048
@ -142,7 +143,7 @@ namespace Stratum @@ -142,7 +143,7 @@ namespace Stratum
Stratum::Server* _server;
// Authorization
std::vector<std::string> _workers;
std::set<std::string> _workers;
// Jobs
bool _subscribed;

17
src/server/poolserver/Stratum/ShareLimiter.cpp

@ -1,6 +1,7 @@ @@ -1,6 +1,7 @@
#include "ShareLimiter.h"
#include "Server.h"
#include "Client.h"
#include "Log.h"
namespace Stratum
{
@ -15,23 +16,33 @@ namespace Stratum @@ -15,23 +16,33 @@ namespace Stratum
if (sinceLast < RETARGET_INTERVAL)
return true;
_lastRetarget = curTime;
while (_shares.size() && (_shares.front() < curTime - RETARGET_TIME_BUFFER))
_shares.pop_front();
uint32 interval = std::min(curTime - _startTime, uint64(RETARGET_TIME_BUFFER));
// Calculate shares/min
double speed = (_shares.size()*60) / interval;
double speed = double(_shares.size()*60) / double(interval);
// Calculate difference from pool target in %
double variance = (speed - RETARGET_SHARES_PER_MIN) / RETARGET_SHARES_PER_MIN;
double variance = speed / double(RETARGET_SHARES_PER_MIN);
sLog.Info(LOG_SERVER, "Miner variance: %f speed: %f", variance, speed);
// Check if we need to retarget
if (std::abs(variance)*100 < RETARGET_VARIANCE)
if (variance*100 < RETARGET_VARIANCE)
return true;
uint64 newDiff = double(_client->GetDifficulty()) * variance;
if (newDiff < 1)
newDiff = 1;
if (newDiff > RETARGET_MAXDIFF)
newDiff = RETARGET_MAXDIFF;
_client->SetDifficulty(newDiff, true);
return true;

14
src/server/poolserver/Stratum/ShareLimiter.h

@ -6,29 +6,23 @@ @@ -6,29 +6,23 @@
#include <deque>
#define RETARGET_INTERVAL 60
#define RETARGET_INTERVAL 20
#define RETARGET_TIME_BUFFER 60*5
#define RETARGET_SHARES_PER_MIN 15
#define RETARGET_VARIANCE 40
#define RETARGET_MAXDIFF 1000000
namespace Stratum
{
class Client;
class ShareLimiterRecord
{
public:
ShareLimiterRecord(uint64 atime, uint64 adiff) : time(atime), diff(adiff) {}
uint64 time;
uint64 diff;
};
class ShareLimiter
{
public:
ShareLimiter(Client* client) : _client(client), _lastRetarget(0)
ShareLimiter(Client* client) : _client(client)
{
_startTime = Util::Date();
_lastRetarget = _startTime;
}
bool Submit();

9
src/server/shared/MySQL/DatabaseConnection.h

@ -82,6 +82,15 @@ namespace MySQL @@ -82,6 +82,15 @@ namespace MySQL
return _stmts[index];
}
std::string Escape(std::string text)
{
char* buf = new char[text.length()*2 + 1];
mysql_real_escape_string(_mysql, buf, text.c_str(), text.length());
std::string result(buf);
delete buf;
return result;
}
private:
bool _Query(const char *sql, MYSQL_RES** result, MYSQL_FIELD** fields, uint64& rowCount, uint32& fieldCount);
bool _Query(PreparedStatement* stmt, MYSQL_RES** result, MYSQL_STMT** resultSTMT, uint32& fieldCount);

13
src/server/shared/MySQL/DatabaseOperation.h

@ -40,13 +40,22 @@ namespace MySQL @@ -40,13 +40,22 @@ namespace MySQL
class DatabaseQueryOperation : public DatabaseOperation
{
public:
DatabaseQueryOperation(const char* query, DatabaseCallback callback = NULL): DatabaseOperation(), _query(query), _callback(callback) {};
DatabaseQueryOperation(const char* query, DatabaseCallback callback = NULL): DatabaseOperation(), _callback(callback)
{
_query = new char[strlen(query)];
strcpy(_query, const_cast<char *>(query));
}
~DatabaseQueryOperation()
{
delete[] _query;
}
void Execute();
private:
DatabaseCallback _callback;
const char* _query;
char* _query;
};
typedef Util::SynchronisedQueue<DatabaseOperation*> DatabaseWorkQueue;

8
src/server/shared/MySQL/DatabaseWorkerPool.h

@ -93,6 +93,14 @@ namespace MySQL @@ -93,6 +93,14 @@ namespace MySQL
return new PreparedStatement(stmtid);
}
std::string Escape(std::string text)
{
DatabaseConnection* conn = GetSyncConnection();
std::string result = conn->Escape(text);
conn->Unlock();
return result;
}
private:
DatabaseConnection* GetSyncConnection()
{

9
src/server/shared/Util.cpp

@ -18,14 +18,9 @@ std::string Util::Date(const char* format, bool utc) @@ -18,14 +18,9 @@ std::string Util::Date(const char* format, bool utc)
return ss.str();
}
uint32 Util::Date(bool utc)
uint32 Util::Date()
{
boost::posix_time::ptime now;
if (utc)
now = boost::posix_time::second_clock::universal_time();
else
now = boost::posix_time::second_clock::local_time();
boost::posix_time::ptime now = boost::posix_time::second_clock::universal_time();
boost::posix_time::ptime epoch(boost::gregorian::date(1970,1,1));
boost::posix_time::time_duration diff = now - epoch;
return diff.total_seconds();

2
src/server/shared/Util.h

@ -19,7 +19,7 @@ @@ -19,7 +19,7 @@
namespace Util
{
std::string Date(const char* format, bool utc = false);
uint32 Date(bool utc = true);
uint32 Date();
std::string FS(const char *str, ...);
std::vector<std::string> Explode(std::string input, std::string delim);

Loading…
Cancel
Save