Browse Source

MySQL Database Implementation Pt 1. Todo: QueryResult, PreparedStatements

peercoin
Intel 12 years ago
parent
commit
400349c7e5
  1. 4
      src/server/poolserver/CMakeLists.txt
  2. 3
      src/server/poolserver/Database/ServerDatabaseEnv.cpp
  3. 12
      src/server/poolserver/Database/ServerDatabaseEnv.h
  4. 29
      src/server/poolserver/Main.cpp
  5. 26
      src/server/poolserver/Server/Server.cpp
  6. 2
      src/server/poolserver/Server/Server.h
  7. 4
      src/server/shared/Database/CMakeLists.txt
  8. 0
      src/server/shared/Database/Database.cpp
  9. 39
      src/server/shared/Database/Database.h
  10. 10
      src/server/shared/Database/DatabaseCallback.h
  11. 10
      src/server/shared/Database/DatabaseEnv.h
  12. 11
      src/server/shared/Database/Field.h
  13. 149
      src/server/shared/Database/MySQL/DatabaseConnectionMySQL.cpp
  14. 79
      src/server/shared/Database/MySQL/DatabaseConnectionMySQL.h
  15. 6
      src/server/shared/Database/MySQL/DatabaseEnvMySQL.h
  16. 26
      src/server/shared/Database/MySQL/DatabaseOperationMySQL.cpp
  17. 52
      src/server/shared/Database/MySQL/DatabaseOperationMySQL.h
  18. 36
      src/server/shared/Database/MySQL/DatabaseWorkerMySQL.cpp
  19. 19
      src/server/shared/Database/MySQL/DatabaseWorkerMySQL.h
  20. 158
      src/server/shared/Database/MySQL/DatabaseWorkerPoolMySQL.h
  21. 10
      src/server/shared/Database/MySQL/FieldMySQL.h
  22. 34
      src/server/shared/Database/MySQL/PreparedStatementMySQL.cpp
  23. 24
      src/server/shared/Database/MySQL/PreparedStatementMySQL.h
  24. 9
      src/server/shared/Database/MySQL/QueryResultMySQL.cpp
  25. 39
      src/server/shared/Database/MySQL/QueryResultMySQL.h
  26. 12
      src/server/shared/Database/PreparedStatement.h
  27. 21
      src/server/shared/Database/QueryResult.h
  28. 2
      src/server/shared/Logging/Log.cpp
  29. 6
      src/server/shared/Logging/Log.h
  30. 17
      src/server/shared/Util.cpp
  31. 81
      src/server/shared/Util.h

4
src/server/poolserver/CMakeLists.txt

@ -1,10 +1,12 @@
# Add sources # Add sources
file(GLOB_RECURSE sources_Server Server/*.cpp Server/*.h) file(GLOB_RECURSE sources_Server Server/*.cpp Server/*.h)
file(GLOB_RECURSE sources_Database Database/*.cpp Database/*.h)
file(GLOB sources_localdir *.cpp *.h) file(GLOB sources_localdir *.cpp *.h)
set(sources_Poolserver set(sources_Poolserver
${sources_Server} ${sources_Server}
${sources_Database}
${sources_localdir} ${sources_localdir}
) )
@ -14,6 +16,7 @@ include_directories(
${CMAKE_SOURCE_DIR}/src/server/shared/Database ${CMAKE_SOURCE_DIR}/src/server/shared/Database
${CMAKE_SOURCE_DIR}/src/server/shared/Logging ${CMAKE_SOURCE_DIR}/src/server/shared/Logging
${CMAKE_SOURCE_DIR}/src/server/poolserver/Server ${CMAKE_SOURCE_DIR}/src/server/poolserver/Server
${CMAKE_SOURCE_DIR}/src/server/poolserver/Database
${Boost_INCLUDE_DIR} ${Boost_INCLUDE_DIR}
${MYSQL_INCLUDE_DIR} ${MYSQL_INCLUDE_DIR}
) )
@ -22,7 +25,6 @@ include_directories(
add_executable(poolserver add_executable(poolserver
${sources_Poolserver} ${sources_Poolserver}
) )
message(status "SHared files: ${sources_Shared}")
# Link libraries # Link libraries
target_link_libraries(poolserver target_link_libraries(poolserver

3
src/server/poolserver/Database/ServerDatabaseEnv.cpp

@ -0,0 +1,3 @@
#include "ServerDatabaseEnv.h"
ServerDatabaseWorkerPoolMySQL sDatabase;

12
src/server/poolserver/Database/ServerDatabaseEnv.h

@ -0,0 +1,12 @@
#ifndef SERVER_DATABASE_ENV_H_
#define SERVER_DATABASE_ENV_H_
#include "DatabaseEnv.h"
class ServerDatabaseWorkerPoolMySQL : public DatabaseWorkerPoolMySQL
{
};
extern ServerDatabaseWorkerPoolMySQL sDatabase;
#endif

29
src/server/poolserver/Main.cpp

@ -13,6 +13,10 @@ bool InitConfig(int argc, char *argv[])
boost::program_options::options_description descServer; boost::program_options::options_description descServer;
boost::program_options::options_description descStratum; boost::program_options::options_description descStratum;
boost::program_options::options_description descLogging; boost::program_options::options_description descLogging;
boost::program_options::options_description descDatabase;
#ifdef WITH_MYSQL
boost::program_options::options_description descMySQL;
#endif
boost::program_options::options_description cmdlineOptions; boost::program_options::options_description cmdlineOptions;
boost::program_options::options_description fileOptions; boost::program_options::options_description fileOptions;
@ -42,9 +46,26 @@ bool InitConfig(int argc, char *argv[])
("LogFileLevel", boost::program_options::value<uint32_t>()->default_value(LOG_LEVEL_WARN), "File log level (0-None, 1-Error, 2-Warn, 3-Info, 4-Debug)") ("LogFileLevel", boost::program_options::value<uint32_t>()->default_value(LOG_LEVEL_WARN), "File log level (0-None, 1-Error, 2-Warn, 3-Info, 4-Debug)")
("LogFileDebugMask", boost::program_options::value<uint32_t>()->default_value(0), "File log debug mask") ("LogFileDebugMask", boost::program_options::value<uint32_t>()->default_value(0), "File log debug mask")
; ;
// Database
#ifdef WITH_MYSQL
descDatabase.add_options()
("DatabaseDriver", boost::program_options::value<std::string>()->default_value("mysql"), "Database Driver")
;
descMySQL.add_options()
("MySQLHost", boost::program_options::value<std::string>()->default_value("127.0.0.1"), "MySQL Host")
("MySQLPort", boost::program_options::value<uint16_t>()->default_value(3306), "MySQL Port")
("MySQLUser", boost::program_options::value<std::string>()->default_value("root"), "MySQL User")
("MySQLPass", boost::program_options::value<std::string>()->default_value(""), "MySQL Password")
("MySQLDatabase", boost::program_options::value<std::string>()->default_value("poolserver"), "MySQL Database")
("MySQLSyncThreads", boost::program_options::value<uint16_t>()->default_value(2), "MySQL Sync Threads to Create")
("MySQLAsyncThreads", boost::program_options::value<uint16_t>()->default_value(2), "MySQL Async Threads to Create")
;
descDatabase.add(descMySQL);
#endif
cmdlineOptions.add(descGeneric).add(descServer).add(descStratum).add(descLogging); cmdlineOptions.add(descGeneric).add(descServer).add(descStratum).add(descLogging).add(descDatabase);
fileOptions.add(descServer).add(descStratum).add(descLogging); fileOptions.add(descServer).add(descStratum).add(descLogging).add(descDatabase);
store(boost::program_options::command_line_parser(argc, argv).options(cmdlineOptions).run(), sConfig.vm); store(boost::program_options::command_line_parser(argc, argv).options(cmdlineOptions).run(), sConfig.vm);
notify(sConfig.vm); notify(sConfig.vm);
@ -54,9 +75,9 @@ bool InitConfig(int argc, char *argv[])
return false; return false;
} }
std::ifstream ifs(sConfig.Get<std::string>("LogFilePath").c_str()); std::ifstream ifs(sConfig.Get<std::string>("config").c_str());
if (!ifs) { if (!ifs.is_open()) {
sLog.Error(LOG_GENERAL, "Failed opening config file: %s", sConfig.Get<std::string>("LogFilePath").c_str()); sLog.Error(LOG_GENERAL, "Failed opening config file: %s", sConfig.Get<std::string>("LogFilePath").c_str());
return false; return false;
} }

26
src/server/poolserver/Server/Server.cpp

@ -2,8 +2,10 @@
#include "Config.h" #include "Config.h"
#include "Log.h" #include "Log.h"
#include "Stratum/Server.h" #include "Stratum/Server.h"
#include "ServerDatabaseEnv.h"
#include <boost/thread.hpp> #include <boost/thread.hpp>
#include <boost/algorithm/string.hpp>
#include <iostream> #include <iostream>
Server::Server() : serverLoops(0) Server::Server() : serverLoops(0)
@ -18,6 +20,11 @@ Server::~Server()
int Server::Run() int Server::Run()
{ {
sLog.Info(LOG_SERVER, "Server is starting..."); sLog.Info(LOG_SERVER, "Server is starting...");
InitDatabase();
sDatabase.Execute("INSERT INTO `test_table` VALUES ('999', 'sync', '1.1')");
sDatabase.ExecuteAsync("INSERT INTO `test_table` VALUES ('999', 'sync', '1.1')");
// Start stratum server // Start stratum server
sLog.Info(LOG_SERVER, "Starting stratum"); sLog.Info(LOG_SERVER, "Starting stratum");
@ -59,6 +66,8 @@ int Server::Run()
} }
sLog.Info(LOG_SERVER, "Server is stopping..."); sLog.Info(LOG_SERVER, "Server is stopping...");
sDatabase.Close();
return exitcode; return exitcode;
} }
@ -67,3 +76,20 @@ void Server::Update(uint32_t diff)
{ {
} }
bool Server::InitDatabase()
{
if (boost::iequals(sConfig.Get<std::string>("DatabaseDriver"), "mysql")) {
MySQLConnectionInfo connInfo;
connInfo.Host = sConfig.Get<std::string>("MySQLHost");
connInfo.Port = sConfig.Get<uint16_t>("MySQLPort");
connInfo.User = sConfig.Get<std::string>("MySQLUser");
connInfo.Pass = sConfig.Get<std::string>("MySQLPass");
connInfo.DB = sConfig.Get<std::string>("MySQLDatabase");
return sDatabase.Open(connInfo, sConfig.Get<uint16_t>("MySQLSyncThreads"), sConfig.Get<uint16_t>("MySQLAsyncThreads"));
} else {
sLog.Error(LOG_SERVER, "Database Driver '%s' not found.", sConfig.Get<std::string>("DatabaseDriver").c_str());
return false;
}
}

2
src/server/poolserver/Server/Server.h

@ -23,6 +23,8 @@ public:
int Run(); int Run();
void Update(uint32_t); void Update(uint32_t);
bool InitDatabase();
}; };
#endif #endif

4
src/server/shared/Database/CMakeLists.txt

@ -1,9 +1,10 @@
file(GLOB sources_MySQL MySQL/*.cpp MySQL/*.h) file(GLOB_RECURSE sources_MySQL MySQL/*.cpp MySQL/*.h)
file(GLOB sources_localdir *.cpp *.h) file(GLOB sources_localdir *.cpp *.h)
set(sources_Database set(sources_Database
${sources_localdir} ${sources_localdir}
PARENT_SCOPE
) )
if(MYSQL) if(MYSQL)
@ -11,5 +12,6 @@ if(MYSQL)
${sources_Database} ${sources_Database}
${sources_MySQL} ${sources_MySQL}
${MYSQL_INCLUDE_DIR} ${MYSQL_INCLUDE_DIR}
PARENT_SCOPE
) )
endif() endif()

0
src/server/shared/Database/Database.cpp

39
src/server/shared/Database/Database.h

@ -1,33 +1,30 @@
#ifndef DATABASE_H_ #ifndef DATABASE_H_
#define DATABASE_H_ #define DATABASE_H_
class Database; #include "QueryResult.h"
class ResultSet; #include "PreparedStatement.h"
class Fields; #include "DatabaseCallback.h"
class PreparedStatement; #include <boost/thread.hpp>
enum PreparedStatementEnum
{
STMT_INSERT_SHARE = 1,
};
class Database class Database
{ {
public: public:
// Ping!
virtual Ping();
// Queries // Queries
virtual void Execute(const char* query); virtual bool Execute(const char* query) = 0;
virtual void Execute(PreparedStatement* stmt); virtual ResultSet* Query(const char* query) = 0;
virtual ResultSet* Query(const char* query);
virtual ResultSet* Query(PreparedStatement* stmt); // Stmt
virtual bool Execute(PreparedStatement* stmt) = 0;
virtual ResultSet* Query(PreparedStatement* stmt) = 0;
// Async
virtual bool ExecuteAsync(const char* query) = 0;
virtual bool ExecuteAsync(PreparedStatement* stmt) = 0;
virtual bool QueryAsync(DatabaseCallback callback, const char* query) = 0;
virtual bool QueryAsync(DatabaseCallback callback, PreparedStatement* stmt) = 0;
// Prepared Statements // Prepared Statements
virtual PreparedStatement* GetPreparedStatement(PreparedStatementEnum smtid); virtual PreparedStatement* GetPreparedStatement(uint32_t index) = 0;
}; };
#endif #endif

10
src/server/shared/Database/DatabaseCallback.h

@ -0,0 +1,10 @@
#ifndef DATABASE_CALLBACK_H_
#define DATABASE_CALLBACK_H_
#include "QueryResult.h"
#include <boost/function.hpp>
typedef boost::function<void(ResultSet*)> DatabaseCallback;
#endif

10
src/server/shared/Database/DatabaseEnv.h

@ -0,0 +1,10 @@
#ifndef DATABASE_ENV_H_
#define DATABASE_ENV_H_
#include "Database.h"
#ifdef WITH_MYSQL
#include "MySQL/DatabaseEnvMySQL.h"
#endif
#endif

11
src/server/shared/Database/Field.h

@ -0,0 +1,11 @@
#ifndef FIELD_H_
#define FIELD_H_
class Field
{
public:
//template<class T>
//virtual Get();
};
#endif

149
src/server/shared/Database/MySQL/DatabaseConnectionMySQL.cpp

@ -0,0 +1,149 @@
#include "DatabaseConnectionMySQL.h"
#include "Log.h"
DatabaseConnectionMySQL::DatabaseConnectionMySQL(MySQLConnectionInfo connInfo, DatabaseWorkQueueMySQL* asyncQueue) :
_mysql(NULL), _asyncQueue(asyncQueue), _worker(NULL), _connectionInfo(connInfo)
{
if (_asyncQueue) {
_worker = new DatabaseWorkerMySQL(_asyncQueue, this);
Type = MYSQL_CONN_ASYNC;
} else
Type = MYSQL_CONN_SYNC;
}
DatabaseConnectionMySQL::~DatabaseConnectionMySQL()
{
assert(_mysql);
for (uint32_t i = 0; i < _stmts.size(); ++i)
delete _stmts[i];
mysql_close(_mysql);
}
bool DatabaseConnectionMySQL::Open()
{
MYSQL* mysqlInit;
mysqlInit = mysql_init(NULL);
if (!mysqlInit)
{
sLog.Error(LOG_DATABASE, "Could not initialize Mysql connection to database `%s`", _connectionInfo.DB.c_str());
return false;
}
mysql_options(mysqlInit, MYSQL_SET_CHARSET_NAME, "utf8");
_mysql = mysql_real_connect(mysqlInit, _connectionInfo.Host.c_str(), _connectionInfo.User.c_str(),
_connectionInfo.Pass.c_str(), _connectionInfo.DB.c_str(), _connectionInfo.Port, NULL, 0);
if (_mysql)
{
sLog.Info(LOG_DATABASE, "Connected to MySQL database at %s", _connectionInfo.Host.c_str());
mysql_autocommit(_mysql, 1);
// set connection properties to UTF8 to properly handle locales for different
// server configs - core sends data in UTF8, so MySQL must expect UTF8 too
mysql_set_character_set(_mysql, "utf8");
return true;
}
else
{
sLog.Error(LOG_DATABASE, "Could not connect to MySQL database at %s: %s\n", _connectionInfo.Host.c_str(), mysql_error(mysqlInit));
mysql_close(mysqlInit);
return false;
}
}
void DatabaseConnectionMySQL::Close()
{
delete this;
}
bool DatabaseConnectionMySQL::Execute(const char* query)
{
sLog.Debug(LOG_DATABASE, "DatabaseConnectionMySQL::Execute(): %s", query);
if (!query || !_mysql)
return false;
if (mysql_query(_mysql, query))
{
uint32_t lErrno = mysql_errno(_mysql);
sLog.Error(LOG_DATABASE, "[%u] %s", lErrno, mysql_error(_mysql));
if (_HandleMySQLErrno(lErrno)) // If it returns true, an error was handled successfully (i.e. reconnection)
return Execute(query); // Try again
return false;
}
else
return true;
}
ResultSetMySQL* DatabaseConnectionMySQL::Query(const char* query)
{
sLog.Debug(LOG_DATABASE, "DatabaseConnectionMySQL::Query(): %s", query);
if (!query)
return NULL;
MYSQL_RES *result = NULL;
MYSQL_FIELD *fields = NULL;
uint64_t rowCount = 0;
uint32_t fieldCount = 0;
if (!_Query(query, result, fields, rowCount, fieldCount))
return NULL;
return new ResultSetMySQL(result, fields, rowCount, fieldCount);
}
bool DatabaseConnectionMySQL::_Query(const char *query, MYSQL_RES* result, MYSQL_FIELD* fields, uint64_t& rowCount, uint32_t& fieldCount)
{
if (!_mysql)
return false;
if (mysql_query(_mysql, query))
{
uint32_t lErrno = mysql_errno(_mysql);
sLog.Error(LOG_DATABASE, "[%u] %s", lErrno, mysql_error(_mysql));
if (_HandleMySQLErrno(lErrno)) // If it returns true, an error was handled successfully (i.e. reconnection)
return _Query(query, result, fields, rowCount, fieldCount); // We try again
return false;
}
result = mysql_store_result(_mysql);
rowCount = mysql_affected_rows(_mysql);
fieldCount = mysql_field_count(_mysql);
if (!result)
return false;
if (!rowCount)
{
mysql_free_result(result);
return false;
}
fields = mysql_fetch_fields(result);
return true;
}
bool DatabaseConnectionMySQL::Execute(PreparedStatement* stmt)
{
}
ResultSetMySQL* DatabaseConnectionMySQL::Query(PreparedStatement* stmt)
{
}
bool DatabaseConnectionMySQL::_HandleMySQLErrno(uint32_t lErrno)
{
return false;
}

79
src/server/shared/Database/MySQL/DatabaseConnectionMySQL.h

@ -0,0 +1,79 @@
#ifndef DATABASE_CONNECTION_MYSQL_H_
#define DATABASE_CONNECTION_MYSQL_H_
#include "DatabaseOperationMySQL.h"
#include "DatabaseWorkerMySQL.h"
#include "QueryResultMySQL.h"
#include "PreparedStatementMySQL.h"
#include <boost/thread.hpp>
#include <mysql.h>
enum MySQLConnectionType
{
MYSQL_CONN_SYNC,
MYSQL_CONN_ASYNC,
MYSQL_CONN_SIZE
};
struct MySQLConnectionInfo
{
std::string Host;
uint16_t Port;
std::string User;
std::string Pass;
std::string DB;
};
class DatabaseConnectionMySQL
{
public:
DatabaseConnectionMySQL(MySQLConnectionInfo connInfo, DatabaseWorkQueueMySQL* asyncQueue = NULL);
~DatabaseConnectionMySQL();
bool Open();
void Close();
// Ping!
void Ping()
{
mysql_ping(_mysql);
}
// Queries
bool Execute(const char* query);
ResultSetMySQL* Query(const char* query);
// Stmt
bool Execute(PreparedStatement* stmt);
ResultSetMySQL* Query(PreparedStatement* stmt);
// Locking
bool LockIfReady()
{
return _mutex.try_lock();
}
void Unlock()
{
_mutex.unlock();
}
MySQLConnectionType Type;
void PrepareStatement(uint32_t index, const char* query);
private:
bool _Query(const char *sql, MYSQL_RES* result, MYSQL_FIELD* fields, uint64_t& pRowCount, uint32_t& pFieldCount);
bool _HandleMySQLErrno(uint32_t lErrno);
boost::mutex _mutex;
MYSQL* _mysql;
DatabaseWorkQueueMySQL* _asyncQueue;
DatabaseWorkerMySQL* _worker;
std::vector<PreparedStatementMySQL*> _stmts;
MySQLConnectionInfo _connectionInfo;
};
#endif

6
src/server/shared/Database/MySQL/DatabaseEnvMySQL.h

@ -0,0 +1,6 @@
#ifndef DATABASE_ENV_MYSQL_H_
#define DATABASE_ENV_MYSQL_H_
#include "DatabaseWorkerPoolMySQL.h"
#endif

26
src/server/shared/Database/MySQL/DatabaseOperationMySQL.cpp

@ -0,0 +1,26 @@
#include "DatabaseOperationMySQL.h"
#include "DatabaseConnectionMySQL.h"
void DatabasePingOperationMySQL::Execute()
{
_conn->Ping();
}
void DatabasePreparedStatementOperationMySQL::Execute()
{
if (_callback) {
ResultSetMySQL* result = _conn->Query(_stmt);
_callback(result);
} else
_conn->Execute(_stmt);
}
void DatabaseQueryOperationMySQL::Execute()
{
if (_callback) {
ResultSetMySQL* result = _conn->Query(_query);
_callback(result);
} else
_conn->Execute(_query);
}

52
src/server/shared/Database/MySQL/DatabaseOperationMySQL.h

@ -0,0 +1,52 @@
#ifndef DATABASE_OPERATION_MYSQL_H_
#define DATABASE_OPERATION_MYSQL_H_
#include "Util.h"
#include "DatabaseCallback.h"
#include "PreparedStatement.h"
#include <boost/bind.hpp>
class DatabaseConnectionMySQL;
class DatabaseOperationMySQL
{
public:
DatabaseOperationMySQL(): _conn(NULL) {}
virtual void Execute() = 0;
void SetConnection(DatabaseConnectionMySQL* conn) { _conn = conn; }
protected:
DatabaseConnectionMySQL* _conn;
};
class DatabasePingOperationMySQL : public DatabaseOperationMySQL
{
void Execute();
};
class DatabasePreparedStatementOperationMySQL : public DatabaseOperationMySQL
{
public:
DatabasePreparedStatementOperationMySQL(PreparedStatement* stmt, DatabaseCallback callback = NULL): DatabaseOperationMySQL(), _stmt(stmt), _callback(callback) {};
void Execute();
private:
DatabaseCallback _callback;
PreparedStatement* _stmt;
};
class DatabaseQueryOperationMySQL : public DatabaseOperationMySQL
{
public:
DatabaseQueryOperationMySQL(const char* query, DatabaseCallback callback = NULL): DatabaseOperationMySQL(), _query(query), _callback(callback) {};
void Execute();
private:
DatabaseCallback _callback;
const char* _query;
};
typedef Util::SynchronisedQueue<DatabaseOperationMySQL*> DatabaseWorkQueueMySQL;
#endif

36
src/server/shared/Database/MySQL/DatabaseWorkerMySQL.cpp

@ -0,0 +1,36 @@
#include "DatabaseWorkerMySQL.h"
#include "Log.h"
DatabaseWorkerMySQL::DatabaseWorkerMySQL(DatabaseWorkQueueMySQL* asyncQueue, DatabaseConnectionMySQL* conn) :
_asyncQueue(asyncQueue), _conn(conn)
{
Activate();
}
void DatabaseWorkerMySQL::Work()
{
sLog.Debug(LOG_DATABASE, "Database worker thread started");
if (!this->_asyncQueue)
return;
DatabaseOperationMySQL* op = NULL;
for (;;)
{
op = this->_asyncQueue->Dequeue();
if (!op) {
sLog.Debug(LOG_DATABASE, "Database worker thread exiting...");
return;
}
sLog.Debug(LOG_DATABASE, "Database worker working...");
op->SetConnection(_conn);
op->Execute();
delete op;
sLog.Debug(LOG_DATABASE, "Database worker finished!");
}
}

19
src/server/shared/Database/MySQL/DatabaseWorkerMySQL.h

@ -0,0 +1,19 @@
#ifndef DATABASE_WORKER_MYSQL_H_
#define DATABASE_WORKER_MYSQL_H_
#include "Util.h"
#include "DatabaseOperationMySQL.h"
class DatabaseConnectionMySQL;
class DatabaseWorkerMySQL : public Util::GenericWorker
{
public:
DatabaseWorkerMySQL(DatabaseWorkQueueMySQL* asyncQueue, DatabaseConnectionMySQL* conn);
void Work();
private:
DatabaseWorkQueueMySQL* _asyncQueue;
DatabaseConnectionMySQL* _conn;
};
#endif

158
src/server/shared/Database/MySQL/DatabaseWorkerPoolMySQL.h

@ -0,0 +1,158 @@
#ifndef DATABASE_WORKER_POOL_MYSQL_H_
#define DATABASE_WORKER_POOL_MYSQL_H_
#include "Database.h"
#include "DatabaseConnectionMySQL.h"
#include "QueryResultMySQL.h"
#include "Log.h"
#include <vector>
class DatabaseWorkerPoolMySQL : public Database
{
public:
DatabaseWorkerPoolMySQL() : _asyncQueue(new DatabaseWorkQueueMySQL()) { }
~DatabaseWorkerPoolMySQL()
{
delete _asyncQueue;
}
bool Open(MySQLConnectionInfo connInfo, uint8_t syncThreads, uint8_t asyncThreads)
{
bool res = true;
_connectionInfo = connInfo;
sLog.Info(LOG_DATABASE, "Opening MySQL Database Pool '%s'. Asynchronous threads: %u, synchronous threads: %u.", _connectionInfo.DB.c_str(), asyncThreads, syncThreads);
for (uint8_t i = 0; i < syncThreads; ++i)
{
DatabaseConnectionMySQL* conn = new DatabaseConnectionMySQL(_connectionInfo);
res &= conn->Open();
_connections[MYSQL_CONN_SYNC].push_back(conn);
}
for (uint8_t i = 0; i < syncThreads; ++i)
{
DatabaseConnectionMySQL* conn = new DatabaseConnectionMySQL(_connectionInfo, _asyncQueue);
res &= conn->Open();
_connections[MYSQL_CONN_ASYNC].push_back(conn);
}
if (res)
sLog.Info(LOG_DATABASE, "MySQL Database Pool '%s' opened successfully. %u total connections.", _connectionInfo.DB.c_str(), _connections[MYSQL_CONN_SYNC].size()+_connections[MYSQL_CONN_ASYNC].size());
else
sLog.Error(LOG_DATABASE, "Failed opening MySQL Database Pool to '%s'.", _connectionInfo.DB.c_str());
return res;
}
void Close()
{
sLog.Info(LOG_DATABASE, "Closing MySQL Database Pool '%s'.", _connectionInfo.DB.c_str());
// Stop worker threads
_asyncQueue->Stop();
for (uint8_t i = 0; i < _connections[MYSQL_CONN_SYNC].size(); ++i)
{
DatabaseConnectionMySQL* conn = _connections[MYSQL_CONN_SYNC][i];
conn->Close();
}
for (uint8_t i = 0; i < _connections[MYSQL_CONN_ASYNC].size(); ++i)
{
DatabaseConnectionMySQL* conn = _connections[MYSQL_CONN_ASYNC][i];
conn->Close();
}
sLog.Info(LOG_DATABASE, "Closed all connections to MySQL Database Pool '%s'.", _connectionInfo.DB.c_str());
}
// Queries
bool Execute(const char* query)
{
DatabaseConnectionMySQL* conn = GetSyncConnection();
bool result = conn->Execute(query);
conn->Unlock();
return result;
}
ResultSetMySQL* Query(const char* query)
{
DatabaseConnectionMySQL* conn = GetSyncConnection();
ResultSetMySQL* result = conn->Query(query);
conn->Unlock();
return result;
}
// Stmt
bool Execute(PreparedStatement* stmt)
{
DatabaseConnectionMySQL* conn = GetSyncConnection();
bool result = conn->Execute(stmt);
conn->Unlock();
return result;
}
ResultSetMySQL* Query(PreparedStatement* stmt)
{
DatabaseConnectionMySQL* conn = GetSyncConnection();
ResultSetMySQL* result = conn->Query(stmt);
conn->Unlock();
return result;
}
// Async
bool ExecuteAsync(const char* query)
{
DatabaseQueryOperationMySQL* op = new DatabaseQueryOperationMySQL(query);
_asyncQueue->Enqueue(op);
return true;
}
bool ExecuteAsync(PreparedStatement* stmt)
{
DatabasePreparedStatementOperationMySQL* op = new DatabasePreparedStatementOperationMySQL(stmt);
_asyncQueue->Enqueue(op);
return true;
}
bool QueryAsync(DatabaseCallback callback, const char* query)
{
DatabaseQueryOperationMySQL* op = new DatabaseQueryOperationMySQL(query, callback);
_asyncQueue->Enqueue(op);
return true;
}
bool QueryAsync(DatabaseCallback callback, PreparedStatement* stmt)
{
DatabasePreparedStatementOperationMySQL* op = new DatabasePreparedStatementOperationMySQL(stmt, callback);
_asyncQueue->Enqueue(op);
return true;
}
// Prepared Statements
PreparedStatement* GetPreparedStatement(uint32_t stmtid)
{
return NULL;//new PreparedStatement(stmtid);
}
private:
DatabaseConnectionMySQL* GetSyncConnection()
{
uint32_t i;
uint8_t conn_size = _connections[MYSQL_CONN_SYNC].size();
DatabaseConnectionMySQL* conn = NULL;
// Block until we find a free connection
for (;;)
{
conn = _connections[MYSQL_CONN_SYNC][++i % conn_size];
if (conn->LockIfReady())
break;
}
return conn;
}
std::vector<DatabaseConnectionMySQL*> _connections[MYSQL_CONN_SIZE];
MySQLConnectionInfo _connectionInfo;
DatabaseWorkQueueMySQL* _asyncQueue;
};
#endif

10
src/server/shared/Database/MySQL/FieldMySQL.h

@ -0,0 +1,10 @@
#ifndef FIELD_MYSQL_H_
#define FIELD_MYSQL_H_
#include "Field.h"
class FieldMySQL : public Field
{
};
#endif

34
src/server/shared/Database/MySQL/PreparedStatementMySQL.cpp

@ -0,0 +1,34 @@
#include "PreparedStatementMySQL.h"
#include <cstring>
PreparedStatementMySQL::PreparedStatementMySQL(MYSQL_STMT* stmt) :
_stmt(stmt), _bind(NULL), PreparedStatement(0)
{
_paramCount = mysql_stmt_param_count(stmt);
_bind = new MYSQL_BIND[_paramCount];
memset(_bind, 0, sizeof(MYSQL_BIND)*_paramCount);
}
PreparedStatementMySQL::~PreparedStatementMySQL()
{
ClearParameters();
if (_stmt->bind_result_done)
{
delete[] _stmt->bind->length;
delete[] _stmt->bind->is_null;
}
mysql_stmt_close(_stmt);
delete[] _bind;
this->~PreparedStatement();
}
void PreparedStatementMySQL::ClearParameters()
{
for (uint8_t i = 0; i < _paramCount; ++i)
{
delete _bind[i].length;
_bind[i].length = NULL;
delete[] (char*) _bind[i].buffer;
_bind[i].buffer = NULL;
}
}

24
src/server/shared/Database/MySQL/PreparedStatementMySQL.h

@ -0,0 +1,24 @@
#ifndef PREPARED_STATEMENT_MYSQL_H_
#define PREPARED_STATEMENT_MYSQL_H_
#include "PreparedStatement.h"
#include <boost/cstdint.hpp>
#include <mysql.h>
class PreparedStatementMySQL : public PreparedStatement
{
public:
PreparedStatementMySQL(MYSQL_STMT* stmt);
~PreparedStatementMySQL();
template<class T>
void Set(const uint8_t index, const T value);
void ClearParameters();
private:
MYSQL_STMT* _stmt;
MYSQL_BIND* _bind;
uint8_t _paramCount;
};
#endif

9
src/server/shared/Database/MySQL/QueryResultMySQL.cpp

@ -0,0 +1,9 @@
#include "QueryResultMySQL.h"
ResultSetMySQL::ResultSetMySQL(MYSQL_RES* result, MYSQL_FIELD* fields, uint64_t rowCount, uint32_t fieldCount)
{
}
ResultSetMySQL::~ResultSetMySQL()
{
}

39
src/server/shared/Database/MySQL/QueryResultMySQL.h

@ -0,0 +1,39 @@
#ifndef QUERY_RESULT_MYSQL_H_
#define QUERY_RESULT_MYSQL_H_
#include "QueryResult.h"
#include "FieldMySQL.h"
#include <boost/shared_ptr.hpp>
#include <cassert>
#include <mysql.h>
class ResultSetMySQL : public ResultSet
{
public:
ResultSetMySQL(MYSQL_RES* result, MYSQL_FIELD* fields, uint64_t rowCount, uint32_t fieldCount);
~ResultSetMySQL();
// Metadata
uint64_t GetRowCount()
{
return _rowCount;
}
uint32_t GetFieldCount()
{
return _fieldCount;
}
bool NextRow() {};
Field* Fetch() {};
private:
uint64_t _rowCount;
FieldMySQL* _currentRow;
uint32_t _fieldCount;
void CleanUp();
MYSQL_RES* _result;
MYSQL_FIELD* _fields;
};
#endif

12
src/server/shared/Database/PreparedStatement.h

@ -0,0 +1,12 @@
#ifndef PREPARED_STATEMENT_H_
#define PREPARED_STATEMENT_H_
#include <boost/cstdint.hpp>
class PreparedStatement
{
public:
PreparedStatement(uint32_t index) {};
};
#endif

21
src/server/shared/Database/QueryResult.h

@ -0,0 +1,21 @@
#ifndef RESULTSET_H_
#define RESULTSET_H_
#include "Field.h"
#include <boost/shared_ptr.hpp>
#include <boost/cstdint.hpp>
class ResultSet
{
public:
// Metadata
virtual uint64_t GetRowCount() = 0;
virtual uint32_t GetFieldCount() = 0;
virtual bool NextRow() = 0;
virtual Field* Fetch() = 0;
};
typedef boost::shared_ptr<ResultSet> QueryResult;
#endif

2
src/server/shared/Logging/Log.cpp

@ -72,6 +72,8 @@ void Log::OpenLogFile(std::string path)
void Log::Write(LogLevel level, LogType type, std::string msg) void Log::Write(LogLevel level, LogType type, std::string msg)
{ {
boost::lock_guard<boost::mutex> lock(_mutex);
switch(level) switch(level)
{ {
case LOG_LEVEL_ERROR: case LOG_LEVEL_ERROR:

6
src/server/shared/Logging/Log.h

@ -7,6 +7,7 @@
#include <cmath> #include <cmath>
#include <cstdarg> #include <cstdarg>
#include <boost/cstdint.hpp> #include <boost/cstdint.hpp>
#include <boost/thread.hpp>
#define MAX_MSG_LEN 32*1024 #define MAX_MSG_LEN 32*1024
#define ATTR_PRINTF(F, V) __attribute__ ((format (printf, F, V))) #define ATTR_PRINTF(F, V) __attribute__ ((format (printf, F, V)))
@ -14,7 +15,8 @@
enum LogType enum LogType
{ {
LOG_GENERAL = 0, LOG_GENERAL = 0,
LOG_SERVER = 2, LOG_SERVER = 1,
LOG_DATABASE = 2
}; };
enum LogLevel enum LogLevel
@ -46,6 +48,8 @@ private:
std::string logfileloc; std::string logfileloc;
std::ofstream logfile; std::ofstream logfile;
boost::mutex _mutex;
}; };
extern Log sLog; extern Log sLog;

17
src/server/shared/Util.cpp

@ -0,0 +1,17 @@
#include "Util.h"
std::string Util::Date(const char* format, bool utc)
{
boost::posix_time::ptime now;
if (utc)
now = boost::posix_time::second_clock::universal_time();
else
now = boost::posix_time::second_clock::local_time();
std::stringstream ss;
boost::posix_time::time_facet *facet = new boost::posix_time::time_facet(format);
ss.imbue(std::locale(std::cout.getloc(), facet));
ss << now;
return ss.str();
}

81
src/server/shared/Util.h

@ -3,25 +3,76 @@
#include <sstream> #include <sstream>
#include <iostream> #include <iostream>
#include <queue>
#include <boost/date_time.hpp> #include <boost/date_time.hpp>
#include <boost/thread.hpp>
#include <boost/cstdint.hpp>
namespace Util namespace Util
{ {
std::string Date(const char* format, bool utc = false) std::string Date(const char* format, bool utc = false);
{
boost::posix_time::ptime now; template <typename T>
if (utc) class SynchronisedQueue
now = boost::posix_time::second_clock::universal_time(); {
else public:
now = boost::posix_time::second_clock::local_time(); SynchronisedQueue() : __endQueue(false) {}
std::stringstream ss; void Enqueue(const T& data)
boost::posix_time::time_facet *facet = new boost::posix_time::time_facet(format); {
ss.imbue(std::locale(std::cout.getloc(), facet)); boost::unique_lock<boost::mutex> lock(__mutex);
ss << now;
__queue.push(data);
return ss.str();
} __cond.notify_one();
}
T Dequeue()
{
boost::unique_lock<boost::mutex> lock(__mutex);
while (__queue.empty() && !__endQueue)
__cond.wait(lock);
if (__endQueue)
return NULL;
T result = __queue.front();
__queue.pop();
return result;
}
void Stop()
{
__endQueue = true;
__cond.notify_all();
}
uint32_t Size()
{
boost::unique_lock<boost::mutex> lock(__mutex);
return __queue.size();
}
private:
bool __endQueue;
std::queue<T> __queue;
boost::mutex __mutex;
boost::condition_variable __cond;
};
class GenericWorker
{
public:
void Activate()
{
_thread = new boost::thread(boost::bind(&GenericWorker::Work, this));
}
private:
virtual void Work() = 0;
boost::thread* _thread;
};
} }
#endif #endif

Loading…
Cancel
Save