diff --git a/src/server/poolserver/CMakeLists.txt b/src/server/poolserver/CMakeLists.txt index 9dde2f0..90cb969 100644 --- a/src/server/poolserver/CMakeLists.txt +++ b/src/server/poolserver/CMakeLists.txt @@ -1,10 +1,12 @@ # Add sources file(GLOB_RECURSE sources_Server Server/*.cpp Server/*.h) +file(GLOB_RECURSE sources_Database Database/*.cpp Database/*.h) file(GLOB sources_localdir *.cpp *.h) set(sources_Poolserver ${sources_Server} + ${sources_Database} ${sources_localdir} ) @@ -14,6 +16,7 @@ include_directories( ${CMAKE_SOURCE_DIR}/src/server/shared/Database ${CMAKE_SOURCE_DIR}/src/server/shared/Logging ${CMAKE_SOURCE_DIR}/src/server/poolserver/Server + ${CMAKE_SOURCE_DIR}/src/server/poolserver/Database ${Boost_INCLUDE_DIR} ${MYSQL_INCLUDE_DIR} ) @@ -22,7 +25,6 @@ include_directories( add_executable(poolserver ${sources_Poolserver} ) -message(status "SHared files: ${sources_Shared}") # Link libraries target_link_libraries(poolserver diff --git a/src/server/poolserver/Database/ServerDatabaseEnv.cpp b/src/server/poolserver/Database/ServerDatabaseEnv.cpp new file mode 100644 index 0000000..c043bdd --- /dev/null +++ b/src/server/poolserver/Database/ServerDatabaseEnv.cpp @@ -0,0 +1,3 @@ +#include "ServerDatabaseEnv.h" + +ServerDatabaseWorkerPoolMySQL sDatabase; \ No newline at end of file diff --git a/src/server/poolserver/Database/ServerDatabaseEnv.h b/src/server/poolserver/Database/ServerDatabaseEnv.h new file mode 100644 index 0000000..c63d2e1 --- /dev/null +++ b/src/server/poolserver/Database/ServerDatabaseEnv.h @@ -0,0 +1,12 @@ +#ifndef SERVER_DATABASE_ENV_H_ +#define SERVER_DATABASE_ENV_H_ + +#include "DatabaseEnv.h" + +class ServerDatabaseWorkerPoolMySQL : public DatabaseWorkerPoolMySQL +{ +}; + +extern ServerDatabaseWorkerPoolMySQL sDatabase; + +#endif \ No newline at end of file diff --git a/src/server/poolserver/Main.cpp b/src/server/poolserver/Main.cpp index 3a1f828..d08edb7 100644 --- a/src/server/poolserver/Main.cpp +++ b/src/server/poolserver/Main.cpp @@ -13,6 +13,10 @@ bool InitConfig(int argc, char *argv[]) boost::program_options::options_description descServer; boost::program_options::options_description descStratum; boost::program_options::options_description descLogging; + boost::program_options::options_description descDatabase; + #ifdef WITH_MYSQL + boost::program_options::options_description descMySQL; + #endif boost::program_options::options_description cmdlineOptions; boost::program_options::options_description fileOptions; @@ -42,9 +46,26 @@ bool InitConfig(int argc, char *argv[]) ("LogFileLevel", boost::program_options::value()->default_value(LOG_LEVEL_WARN), "File log level (0-None, 1-Error, 2-Warn, 3-Info, 4-Debug)") ("LogFileDebugMask", boost::program_options::value()->default_value(0), "File log debug mask") ; + + // Database + #ifdef WITH_MYSQL + descDatabase.add_options() + ("DatabaseDriver", boost::program_options::value()->default_value("mysql"), "Database Driver") + ; + descMySQL.add_options() + ("MySQLHost", boost::program_options::value()->default_value("127.0.0.1"), "MySQL Host") + ("MySQLPort", boost::program_options::value()->default_value(3306), "MySQL Port") + ("MySQLUser", boost::program_options::value()->default_value("root"), "MySQL User") + ("MySQLPass", boost::program_options::value()->default_value(""), "MySQL Password") + ("MySQLDatabase", boost::program_options::value()->default_value("poolserver"), "MySQL Database") + ("MySQLSyncThreads", boost::program_options::value()->default_value(2), "MySQL Sync Threads to Create") + ("MySQLAsyncThreads", boost::program_options::value()->default_value(2), "MySQL Async Threads to Create") + ; + descDatabase.add(descMySQL); + #endif - cmdlineOptions.add(descGeneric).add(descServer).add(descStratum).add(descLogging); - fileOptions.add(descServer).add(descStratum).add(descLogging); + cmdlineOptions.add(descGeneric).add(descServer).add(descStratum).add(descLogging).add(descDatabase); + fileOptions.add(descServer).add(descStratum).add(descLogging).add(descDatabase); store(boost::program_options::command_line_parser(argc, argv).options(cmdlineOptions).run(), sConfig.vm); notify(sConfig.vm); @@ -54,9 +75,9 @@ bool InitConfig(int argc, char *argv[]) return false; } - std::ifstream ifs(sConfig.Get("LogFilePath").c_str()); + std::ifstream ifs(sConfig.Get("config").c_str()); - if (!ifs) { + if (!ifs.is_open()) { sLog.Error(LOG_GENERAL, "Failed opening config file: %s", sConfig.Get("LogFilePath").c_str()); return false; } diff --git a/src/server/poolserver/Server/Server.cpp b/src/server/poolserver/Server/Server.cpp index 96b66e4..a41afef 100644 --- a/src/server/poolserver/Server/Server.cpp +++ b/src/server/poolserver/Server/Server.cpp @@ -2,8 +2,10 @@ #include "Config.h" #include "Log.h" #include "Stratum/Server.h" +#include "ServerDatabaseEnv.h" #include +#include #include Server::Server() : serverLoops(0) @@ -18,6 +20,11 @@ Server::~Server() int Server::Run() { sLog.Info(LOG_SERVER, "Server is starting..."); + + InitDatabase(); + + sDatabase.Execute("INSERT INTO `test_table` VALUES ('999', 'sync', '1.1')"); + sDatabase.ExecuteAsync("INSERT INTO `test_table` VALUES ('999', 'sync', '1.1')"); // Start stratum server sLog.Info(LOG_SERVER, "Starting stratum"); @@ -59,6 +66,8 @@ int Server::Run() } sLog.Info(LOG_SERVER, "Server is stopping..."); + + sDatabase.Close(); return exitcode; } @@ -67,3 +76,20 @@ void Server::Update(uint32_t diff) { } + +bool Server::InitDatabase() +{ + if (boost::iequals(sConfig.Get("DatabaseDriver"), "mysql")) { + MySQLConnectionInfo connInfo; + connInfo.Host = sConfig.Get("MySQLHost"); + connInfo.Port = sConfig.Get("MySQLPort"); + connInfo.User = sConfig.Get("MySQLUser"); + connInfo.Pass = sConfig.Get("MySQLPass"); + connInfo.DB = sConfig.Get("MySQLDatabase"); + return sDatabase.Open(connInfo, sConfig.Get("MySQLSyncThreads"), sConfig.Get("MySQLAsyncThreads")); + } else { + sLog.Error(LOG_SERVER, "Database Driver '%s' not found.", sConfig.Get("DatabaseDriver").c_str()); + return false; + } +} + diff --git a/src/server/poolserver/Server/Server.h b/src/server/poolserver/Server/Server.h index 463e4ee..941803a 100644 --- a/src/server/poolserver/Server/Server.h +++ b/src/server/poolserver/Server/Server.h @@ -23,6 +23,8 @@ public: int Run(); void Update(uint32_t); + + bool InitDatabase(); }; #endif diff --git a/src/server/shared/Database/CMakeLists.txt b/src/server/shared/Database/CMakeLists.txt index 3ab3788..b3ae7eb 100644 --- a/src/server/shared/Database/CMakeLists.txt +++ b/src/server/shared/Database/CMakeLists.txt @@ -1,9 +1,10 @@ -file(GLOB sources_MySQL MySQL/*.cpp MySQL/*.h) +file(GLOB_RECURSE sources_MySQL MySQL/*.cpp MySQL/*.h) file(GLOB sources_localdir *.cpp *.h) set(sources_Database ${sources_localdir} + PARENT_SCOPE ) if(MYSQL) @@ -11,5 +12,6 @@ if(MYSQL) ${sources_Database} ${sources_MySQL} ${MYSQL_INCLUDE_DIR} + PARENT_SCOPE ) endif() diff --git a/src/server/shared/Database/Database.cpp b/src/server/shared/Database/Database.cpp deleted file mode 100644 index e69de29..0000000 diff --git a/src/server/shared/Database/Database.h b/src/server/shared/Database/Database.h index dfd6e4f..8ce5ae0 100644 --- a/src/server/shared/Database/Database.h +++ b/src/server/shared/Database/Database.h @@ -1,33 +1,30 @@ #ifndef DATABASE_H_ #define DATABASE_H_ -class Database; -class ResultSet; -class Fields; -class PreparedStatement; - -enum PreparedStatementEnum -{ - STMT_INSERT_SHARE = 1, -}; +#include "QueryResult.h" +#include "PreparedStatement.h" +#include "DatabaseCallback.h" +#include class Database { public: - - // Ping! - virtual Ping(); - // Queries - virtual void Execute(const char* query); - virtual void Execute(PreparedStatement* stmt); - virtual ResultSet* Query(const char* query); - virtual ResultSet* Query(PreparedStatement* stmt); - + virtual bool Execute(const char* query) = 0; + virtual ResultSet* Query(const char* query) = 0; + + // Stmt + virtual bool Execute(PreparedStatement* stmt) = 0; + virtual ResultSet* Query(PreparedStatement* stmt) = 0; + + // Async + virtual bool ExecuteAsync(const char* query) = 0; + virtual bool ExecuteAsync(PreparedStatement* stmt) = 0; + virtual bool QueryAsync(DatabaseCallback callback, const char* query) = 0; + virtual bool QueryAsync(DatabaseCallback callback, PreparedStatement* stmt) = 0; + // Prepared Statements - virtual PreparedStatement* GetPreparedStatement(PreparedStatementEnum smtid); - - + virtual PreparedStatement* GetPreparedStatement(uint32_t index) = 0; }; #endif diff --git a/src/server/shared/Database/DatabaseCallback.h b/src/server/shared/Database/DatabaseCallback.h new file mode 100644 index 0000000..d687390 --- /dev/null +++ b/src/server/shared/Database/DatabaseCallback.h @@ -0,0 +1,10 @@ +#ifndef DATABASE_CALLBACK_H_ +#define DATABASE_CALLBACK_H_ + +#include "QueryResult.h" + +#include + +typedef boost::function DatabaseCallback; + +#endif \ No newline at end of file diff --git a/src/server/shared/Database/DatabaseEnv.h b/src/server/shared/Database/DatabaseEnv.h new file mode 100644 index 0000000..51230bd --- /dev/null +++ b/src/server/shared/Database/DatabaseEnv.h @@ -0,0 +1,10 @@ +#ifndef DATABASE_ENV_H_ +#define DATABASE_ENV_H_ + +#include "Database.h" + +#ifdef WITH_MYSQL +#include "MySQL/DatabaseEnvMySQL.h" +#endif + +#endif \ No newline at end of file diff --git a/src/server/shared/Database/Field.h b/src/server/shared/Database/Field.h new file mode 100644 index 0000000..334189b --- /dev/null +++ b/src/server/shared/Database/Field.h @@ -0,0 +1,11 @@ +#ifndef FIELD_H_ +#define FIELD_H_ + +class Field +{ +public: + //template + //virtual Get(); +}; + +#endif \ No newline at end of file diff --git a/src/server/shared/Database/MySQL/DatabaseConnectionMySQL.cpp b/src/server/shared/Database/MySQL/DatabaseConnectionMySQL.cpp new file mode 100644 index 0000000..22d3eaf --- /dev/null +++ b/src/server/shared/Database/MySQL/DatabaseConnectionMySQL.cpp @@ -0,0 +1,149 @@ +#include "DatabaseConnectionMySQL.h" +#include "Log.h" + +DatabaseConnectionMySQL::DatabaseConnectionMySQL(MySQLConnectionInfo connInfo, DatabaseWorkQueueMySQL* asyncQueue) : +_mysql(NULL), _asyncQueue(asyncQueue), _worker(NULL), _connectionInfo(connInfo) +{ + if (_asyncQueue) { + _worker = new DatabaseWorkerMySQL(_asyncQueue, this); + Type = MYSQL_CONN_ASYNC; + } else + Type = MYSQL_CONN_SYNC; +} + +DatabaseConnectionMySQL::~DatabaseConnectionMySQL() +{ + assert(_mysql); + + for (uint32_t i = 0; i < _stmts.size(); ++i) + delete _stmts[i]; + + mysql_close(_mysql); +} + +bool DatabaseConnectionMySQL::Open() +{ + MYSQL* mysqlInit; + mysqlInit = mysql_init(NULL); + if (!mysqlInit) + { + sLog.Error(LOG_DATABASE, "Could not initialize Mysql connection to database `%s`", _connectionInfo.DB.c_str()); + return false; + } + + mysql_options(mysqlInit, MYSQL_SET_CHARSET_NAME, "utf8"); + + _mysql = mysql_real_connect(mysqlInit, _connectionInfo.Host.c_str(), _connectionInfo.User.c_str(), + _connectionInfo.Pass.c_str(), _connectionInfo.DB.c_str(), _connectionInfo.Port, NULL, 0); + + if (_mysql) + { + sLog.Info(LOG_DATABASE, "Connected to MySQL database at %s", _connectionInfo.Host.c_str()); + mysql_autocommit(_mysql, 1); + + // set connection properties to UTF8 to properly handle locales for different + // server configs - core sends data in UTF8, so MySQL must expect UTF8 too + mysql_set_character_set(_mysql, "utf8"); + + return true; + } + else + { + sLog.Error(LOG_DATABASE, "Could not connect to MySQL database at %s: %s\n", _connectionInfo.Host.c_str(), mysql_error(mysqlInit)); + mysql_close(mysqlInit); + return false; + } +} + +void DatabaseConnectionMySQL::Close() +{ + delete this; +} + +bool DatabaseConnectionMySQL::Execute(const char* query) +{ + sLog.Debug(LOG_DATABASE, "DatabaseConnectionMySQL::Execute(): %s", query); + + if (!query || !_mysql) + return false; + + if (mysql_query(_mysql, query)) + { + uint32_t lErrno = mysql_errno(_mysql); + + sLog.Error(LOG_DATABASE, "[%u] %s", lErrno, mysql_error(_mysql)); + + if (_HandleMySQLErrno(lErrno)) // If it returns true, an error was handled successfully (i.e. reconnection) + return Execute(query); // Try again + + return false; + } + else + return true; +} + +ResultSetMySQL* DatabaseConnectionMySQL::Query(const char* query) +{ + sLog.Debug(LOG_DATABASE, "DatabaseConnectionMySQL::Query(): %s", query); + + if (!query) + return NULL; + + MYSQL_RES *result = NULL; + MYSQL_FIELD *fields = NULL; + uint64_t rowCount = 0; + uint32_t fieldCount = 0; + + if (!_Query(query, result, fields, rowCount, fieldCount)) + return NULL; + + return new ResultSetMySQL(result, fields, rowCount, fieldCount); +} + +bool DatabaseConnectionMySQL::_Query(const char *query, MYSQL_RES* result, MYSQL_FIELD* fields, uint64_t& rowCount, uint32_t& fieldCount) +{ + if (!_mysql) + return false; + + if (mysql_query(_mysql, query)) + { + uint32_t lErrno = mysql_errno(_mysql); + + sLog.Error(LOG_DATABASE, "[%u] %s", lErrno, mysql_error(_mysql)); + + if (_HandleMySQLErrno(lErrno)) // If it returns true, an error was handled successfully (i.e. reconnection) + return _Query(query, result, fields, rowCount, fieldCount); // We try again + + return false; + } + + result = mysql_store_result(_mysql); + rowCount = mysql_affected_rows(_mysql); + fieldCount = mysql_field_count(_mysql); + + if (!result) + return false; + + if (!rowCount) + { + mysql_free_result(result); + return false; + } + + fields = mysql_fetch_fields(result); + + return true; +} + +bool DatabaseConnectionMySQL::Execute(PreparedStatement* stmt) +{ +} + +ResultSetMySQL* DatabaseConnectionMySQL::Query(PreparedStatement* stmt) +{ +} + +bool DatabaseConnectionMySQL::_HandleMySQLErrno(uint32_t lErrno) +{ + return false; +} \ No newline at end of file diff --git a/src/server/shared/Database/MySQL/DatabaseConnectionMySQL.h b/src/server/shared/Database/MySQL/DatabaseConnectionMySQL.h new file mode 100644 index 0000000..08a744a --- /dev/null +++ b/src/server/shared/Database/MySQL/DatabaseConnectionMySQL.h @@ -0,0 +1,79 @@ +#ifndef DATABASE_CONNECTION_MYSQL_H_ +#define DATABASE_CONNECTION_MYSQL_H_ + +#include "DatabaseOperationMySQL.h" +#include "DatabaseWorkerMySQL.h" +#include "QueryResultMySQL.h" +#include "PreparedStatementMySQL.h" + +#include +#include + +enum MySQLConnectionType +{ + MYSQL_CONN_SYNC, + MYSQL_CONN_ASYNC, + MYSQL_CONN_SIZE +}; + +struct MySQLConnectionInfo +{ + std::string Host; + uint16_t Port; + std::string User; + std::string Pass; + std::string DB; +}; + +class DatabaseConnectionMySQL +{ +public: + DatabaseConnectionMySQL(MySQLConnectionInfo connInfo, DatabaseWorkQueueMySQL* asyncQueue = NULL); + ~DatabaseConnectionMySQL(); + + bool Open(); + void Close(); + + // Ping! + void Ping() + { + mysql_ping(_mysql); + } + + // Queries + bool Execute(const char* query); + ResultSetMySQL* Query(const char* query); + + // Stmt + bool Execute(PreparedStatement* stmt); + ResultSetMySQL* Query(PreparedStatement* stmt); + + // Locking + bool LockIfReady() + { + return _mutex.try_lock(); + } + + void Unlock() + { + _mutex.unlock(); + } + + MySQLConnectionType Type; + + void PrepareStatement(uint32_t index, const char* query); + +private: + bool _Query(const char *sql, MYSQL_RES* result, MYSQL_FIELD* fields, uint64_t& pRowCount, uint32_t& pFieldCount); + + bool _HandleMySQLErrno(uint32_t lErrno); + + boost::mutex _mutex; + MYSQL* _mysql; + DatabaseWorkQueueMySQL* _asyncQueue; + DatabaseWorkerMySQL* _worker; + std::vector _stmts; + MySQLConnectionInfo _connectionInfo; +}; + +#endif \ No newline at end of file diff --git a/src/server/shared/Database/MySQL/DatabaseEnvMySQL.h b/src/server/shared/Database/MySQL/DatabaseEnvMySQL.h new file mode 100644 index 0000000..09a1fd7 --- /dev/null +++ b/src/server/shared/Database/MySQL/DatabaseEnvMySQL.h @@ -0,0 +1,6 @@ +#ifndef DATABASE_ENV_MYSQL_H_ +#define DATABASE_ENV_MYSQL_H_ + +#include "DatabaseWorkerPoolMySQL.h" + +#endif \ No newline at end of file diff --git a/src/server/shared/Database/MySQL/DatabaseOperationMySQL.cpp b/src/server/shared/Database/MySQL/DatabaseOperationMySQL.cpp new file mode 100644 index 0000000..20069a0 --- /dev/null +++ b/src/server/shared/Database/MySQL/DatabaseOperationMySQL.cpp @@ -0,0 +1,26 @@ +#include "DatabaseOperationMySQL.h" +#include "DatabaseConnectionMySQL.h" + +void DatabasePingOperationMySQL::Execute() +{ + _conn->Ping(); +} + +void DatabasePreparedStatementOperationMySQL::Execute() +{ + if (_callback) { + ResultSetMySQL* result = _conn->Query(_stmt); + _callback(result); + } else + _conn->Execute(_stmt); +} + + +void DatabaseQueryOperationMySQL::Execute() +{ + if (_callback) { + ResultSetMySQL* result = _conn->Query(_query); + _callback(result); + } else + _conn->Execute(_query); +} \ No newline at end of file diff --git a/src/server/shared/Database/MySQL/DatabaseOperationMySQL.h b/src/server/shared/Database/MySQL/DatabaseOperationMySQL.h new file mode 100644 index 0000000..86679a4 --- /dev/null +++ b/src/server/shared/Database/MySQL/DatabaseOperationMySQL.h @@ -0,0 +1,52 @@ +#ifndef DATABASE_OPERATION_MYSQL_H_ +#define DATABASE_OPERATION_MYSQL_H_ + +#include "Util.h" +#include "DatabaseCallback.h" +#include "PreparedStatement.h" +#include + +class DatabaseConnectionMySQL; + +class DatabaseOperationMySQL +{ +public: + DatabaseOperationMySQL(): _conn(NULL) {} + virtual void Execute() = 0; + void SetConnection(DatabaseConnectionMySQL* conn) { _conn = conn; } +protected: + DatabaseConnectionMySQL* _conn; +}; + +class DatabasePingOperationMySQL : public DatabaseOperationMySQL +{ + void Execute(); +}; + +class DatabasePreparedStatementOperationMySQL : public DatabaseOperationMySQL +{ +public: + DatabasePreparedStatementOperationMySQL(PreparedStatement* stmt, DatabaseCallback callback = NULL): DatabaseOperationMySQL(), _stmt(stmt), _callback(callback) {}; + + void Execute(); + +private: + DatabaseCallback _callback; + PreparedStatement* _stmt; +}; + +class DatabaseQueryOperationMySQL : public DatabaseOperationMySQL +{ +public: + DatabaseQueryOperationMySQL(const char* query, DatabaseCallback callback = NULL): DatabaseOperationMySQL(), _query(query), _callback(callback) {}; + + void Execute(); + +private: + DatabaseCallback _callback; + const char* _query; +}; + +typedef Util::SynchronisedQueue DatabaseWorkQueueMySQL; + +#endif \ No newline at end of file diff --git a/src/server/shared/Database/MySQL/DatabaseWorkerMySQL.cpp b/src/server/shared/Database/MySQL/DatabaseWorkerMySQL.cpp new file mode 100644 index 0000000..b1d4b55 --- /dev/null +++ b/src/server/shared/Database/MySQL/DatabaseWorkerMySQL.cpp @@ -0,0 +1,36 @@ +#include "DatabaseWorkerMySQL.h" +#include "Log.h" + +DatabaseWorkerMySQL::DatabaseWorkerMySQL(DatabaseWorkQueueMySQL* asyncQueue, DatabaseConnectionMySQL* conn) : +_asyncQueue(asyncQueue), _conn(conn) +{ + Activate(); +} + +void DatabaseWorkerMySQL::Work() +{ + sLog.Debug(LOG_DATABASE, "Database worker thread started"); + + if (!this->_asyncQueue) + return; + + DatabaseOperationMySQL* op = NULL; + for (;;) + { + op = this->_asyncQueue->Dequeue(); + + if (!op) { + sLog.Debug(LOG_DATABASE, "Database worker thread exiting..."); + return; + } + + sLog.Debug(LOG_DATABASE, "Database worker working..."); + + op->SetConnection(_conn); + op->Execute(); + + delete op; + + sLog.Debug(LOG_DATABASE, "Database worker finished!"); + } +} diff --git a/src/server/shared/Database/MySQL/DatabaseWorkerMySQL.h b/src/server/shared/Database/MySQL/DatabaseWorkerMySQL.h new file mode 100644 index 0000000..ab90f70 --- /dev/null +++ b/src/server/shared/Database/MySQL/DatabaseWorkerMySQL.h @@ -0,0 +1,19 @@ +#ifndef DATABASE_WORKER_MYSQL_H_ +#define DATABASE_WORKER_MYSQL_H_ + +#include "Util.h" +#include "DatabaseOperationMySQL.h" + +class DatabaseConnectionMySQL; + +class DatabaseWorkerMySQL : public Util::GenericWorker +{ +public: + DatabaseWorkerMySQL(DatabaseWorkQueueMySQL* asyncQueue, DatabaseConnectionMySQL* conn); + void Work(); +private: + DatabaseWorkQueueMySQL* _asyncQueue; + DatabaseConnectionMySQL* _conn; +}; + +#endif \ No newline at end of file diff --git a/src/server/shared/Database/MySQL/DatabaseWorkerPoolMySQL.h b/src/server/shared/Database/MySQL/DatabaseWorkerPoolMySQL.h new file mode 100644 index 0000000..6069c2c --- /dev/null +++ b/src/server/shared/Database/MySQL/DatabaseWorkerPoolMySQL.h @@ -0,0 +1,158 @@ +#ifndef DATABASE_WORKER_POOL_MYSQL_H_ +#define DATABASE_WORKER_POOL_MYSQL_H_ + +#include "Database.h" +#include "DatabaseConnectionMySQL.h" +#include "QueryResultMySQL.h" +#include "Log.h" + +#include + +class DatabaseWorkerPoolMySQL : public Database +{ +public: + DatabaseWorkerPoolMySQL() : _asyncQueue(new DatabaseWorkQueueMySQL()) { } + ~DatabaseWorkerPoolMySQL() + { + delete _asyncQueue; + } + + bool Open(MySQLConnectionInfo connInfo, uint8_t syncThreads, uint8_t asyncThreads) + { + bool res = true; + _connectionInfo = connInfo; + + sLog.Info(LOG_DATABASE, "Opening MySQL Database Pool '%s'. Asynchronous threads: %u, synchronous threads: %u.", _connectionInfo.DB.c_str(), asyncThreads, syncThreads); + + for (uint8_t i = 0; i < syncThreads; ++i) + { + DatabaseConnectionMySQL* conn = new DatabaseConnectionMySQL(_connectionInfo); + res &= conn->Open(); + _connections[MYSQL_CONN_SYNC].push_back(conn); + } + + for (uint8_t i = 0; i < syncThreads; ++i) + { + DatabaseConnectionMySQL* conn = new DatabaseConnectionMySQL(_connectionInfo, _asyncQueue); + res &= conn->Open(); + _connections[MYSQL_CONN_ASYNC].push_back(conn); + } + + if (res) + sLog.Info(LOG_DATABASE, "MySQL Database Pool '%s' opened successfully. %u total connections.", _connectionInfo.DB.c_str(), _connections[MYSQL_CONN_SYNC].size()+_connections[MYSQL_CONN_ASYNC].size()); + else + sLog.Error(LOG_DATABASE, "Failed opening MySQL Database Pool to '%s'.", _connectionInfo.DB.c_str()); + + return res; + } + + void Close() + { + sLog.Info(LOG_DATABASE, "Closing MySQL Database Pool '%s'.", _connectionInfo.DB.c_str()); + + // Stop worker threads + _asyncQueue->Stop(); + + for (uint8_t i = 0; i < _connections[MYSQL_CONN_SYNC].size(); ++i) + { + DatabaseConnectionMySQL* conn = _connections[MYSQL_CONN_SYNC][i]; + conn->Close(); + } + + for (uint8_t i = 0; i < _connections[MYSQL_CONN_ASYNC].size(); ++i) + { + DatabaseConnectionMySQL* conn = _connections[MYSQL_CONN_ASYNC][i]; + conn->Close(); + } + + sLog.Info(LOG_DATABASE, "Closed all connections to MySQL Database Pool '%s'.", _connectionInfo.DB.c_str()); + } + + // Queries + bool Execute(const char* query) + { + DatabaseConnectionMySQL* conn = GetSyncConnection(); + bool result = conn->Execute(query); + conn->Unlock(); + return result; + } + ResultSetMySQL* Query(const char* query) + { + DatabaseConnectionMySQL* conn = GetSyncConnection(); + ResultSetMySQL* result = conn->Query(query); + conn->Unlock(); + return result; + } + + // Stmt + bool Execute(PreparedStatement* stmt) + { + DatabaseConnectionMySQL* conn = GetSyncConnection(); + bool result = conn->Execute(stmt); + conn->Unlock(); + return result; + } + ResultSetMySQL* Query(PreparedStatement* stmt) + { + DatabaseConnectionMySQL* conn = GetSyncConnection(); + ResultSetMySQL* result = conn->Query(stmt); + conn->Unlock(); + return result; + } + + // Async + bool ExecuteAsync(const char* query) + { + DatabaseQueryOperationMySQL* op = new DatabaseQueryOperationMySQL(query); + _asyncQueue->Enqueue(op); + return true; + } + bool ExecuteAsync(PreparedStatement* stmt) + { + DatabasePreparedStatementOperationMySQL* op = new DatabasePreparedStatementOperationMySQL(stmt); + _asyncQueue->Enqueue(op); + return true; + } + bool QueryAsync(DatabaseCallback callback, const char* query) + { + DatabaseQueryOperationMySQL* op = new DatabaseQueryOperationMySQL(query, callback); + _asyncQueue->Enqueue(op); + return true; + } + bool QueryAsync(DatabaseCallback callback, PreparedStatement* stmt) + { + DatabasePreparedStatementOperationMySQL* op = new DatabasePreparedStatementOperationMySQL(stmt, callback); + _asyncQueue->Enqueue(op); + return true; + } + + // Prepared Statements + PreparedStatement* GetPreparedStatement(uint32_t stmtid) + { + return NULL;//new PreparedStatement(stmtid); + } + +private: + DatabaseConnectionMySQL* GetSyncConnection() + { + uint32_t i; + uint8_t conn_size = _connections[MYSQL_CONN_SYNC].size(); + DatabaseConnectionMySQL* conn = NULL; + + // Block until we find a free connection + for (;;) + { + conn = _connections[MYSQL_CONN_SYNC][++i % conn_size]; + if (conn->LockIfReady()) + break; + } + + return conn; + } + + std::vector _connections[MYSQL_CONN_SIZE]; + MySQLConnectionInfo _connectionInfo; + DatabaseWorkQueueMySQL* _asyncQueue; +}; + +#endif \ No newline at end of file diff --git a/src/server/shared/Database/MySQL/FieldMySQL.h b/src/server/shared/Database/MySQL/FieldMySQL.h new file mode 100644 index 0000000..87395f3 --- /dev/null +++ b/src/server/shared/Database/MySQL/FieldMySQL.h @@ -0,0 +1,10 @@ +#ifndef FIELD_MYSQL_H_ +#define FIELD_MYSQL_H_ + +#include "Field.h" + +class FieldMySQL : public Field +{ +}; + +#endif \ No newline at end of file diff --git a/src/server/shared/Database/MySQL/PreparedStatementMySQL.cpp b/src/server/shared/Database/MySQL/PreparedStatementMySQL.cpp new file mode 100644 index 0000000..c98458a --- /dev/null +++ b/src/server/shared/Database/MySQL/PreparedStatementMySQL.cpp @@ -0,0 +1,34 @@ +#include "PreparedStatementMySQL.h" +#include + +PreparedStatementMySQL::PreparedStatementMySQL(MYSQL_STMT* stmt) : +_stmt(stmt), _bind(NULL), PreparedStatement(0) +{ + _paramCount = mysql_stmt_param_count(stmt); + _bind = new MYSQL_BIND[_paramCount]; + memset(_bind, 0, sizeof(MYSQL_BIND)*_paramCount); +} + +PreparedStatementMySQL::~PreparedStatementMySQL() +{ + ClearParameters(); + if (_stmt->bind_result_done) + { + delete[] _stmt->bind->length; + delete[] _stmt->bind->is_null; + } + mysql_stmt_close(_stmt); + delete[] _bind; + this->~PreparedStatement(); +} + +void PreparedStatementMySQL::ClearParameters() +{ + for (uint8_t i = 0; i < _paramCount; ++i) + { + delete _bind[i].length; + _bind[i].length = NULL; + delete[] (char*) _bind[i].buffer; + _bind[i].buffer = NULL; + } +} \ No newline at end of file diff --git a/src/server/shared/Database/MySQL/PreparedStatementMySQL.h b/src/server/shared/Database/MySQL/PreparedStatementMySQL.h new file mode 100644 index 0000000..297cdb7 --- /dev/null +++ b/src/server/shared/Database/MySQL/PreparedStatementMySQL.h @@ -0,0 +1,24 @@ +#ifndef PREPARED_STATEMENT_MYSQL_H_ +#define PREPARED_STATEMENT_MYSQL_H_ + +#include "PreparedStatement.h" +#include +#include + +class PreparedStatementMySQL : public PreparedStatement +{ +public: + PreparedStatementMySQL(MYSQL_STMT* stmt); + ~PreparedStatementMySQL(); + + template + void Set(const uint8_t index, const T value); + + void ClearParameters(); +private: + MYSQL_STMT* _stmt; + MYSQL_BIND* _bind; + uint8_t _paramCount; +}; + +#endif \ No newline at end of file diff --git a/src/server/shared/Database/MySQL/QueryResultMySQL.cpp b/src/server/shared/Database/MySQL/QueryResultMySQL.cpp new file mode 100644 index 0000000..8524e08 --- /dev/null +++ b/src/server/shared/Database/MySQL/QueryResultMySQL.cpp @@ -0,0 +1,9 @@ +#include "QueryResultMySQL.h" + +ResultSetMySQL::ResultSetMySQL(MYSQL_RES* result, MYSQL_FIELD* fields, uint64_t rowCount, uint32_t fieldCount) +{ +} + +ResultSetMySQL::~ResultSetMySQL() +{ +} diff --git a/src/server/shared/Database/MySQL/QueryResultMySQL.h b/src/server/shared/Database/MySQL/QueryResultMySQL.h new file mode 100644 index 0000000..a29f77a --- /dev/null +++ b/src/server/shared/Database/MySQL/QueryResultMySQL.h @@ -0,0 +1,39 @@ +#ifndef QUERY_RESULT_MYSQL_H_ +#define QUERY_RESULT_MYSQL_H_ + +#include "QueryResult.h" +#include "FieldMySQL.h" +#include +#include +#include + +class ResultSetMySQL : public ResultSet +{ +public: + ResultSetMySQL(MYSQL_RES* result, MYSQL_FIELD* fields, uint64_t rowCount, uint32_t fieldCount); + ~ResultSetMySQL(); + + // Metadata + uint64_t GetRowCount() + { + return _rowCount; + } + + uint32_t GetFieldCount() + { + return _fieldCount; + } + + bool NextRow() {}; + Field* Fetch() {}; + +private: + uint64_t _rowCount; + FieldMySQL* _currentRow; + uint32_t _fieldCount; + void CleanUp(); + MYSQL_RES* _result; + MYSQL_FIELD* _fields; +}; + +#endif \ No newline at end of file diff --git a/src/server/shared/Database/PreparedStatement.h b/src/server/shared/Database/PreparedStatement.h new file mode 100644 index 0000000..bc1fda6 --- /dev/null +++ b/src/server/shared/Database/PreparedStatement.h @@ -0,0 +1,12 @@ +#ifndef PREPARED_STATEMENT_H_ +#define PREPARED_STATEMENT_H_ + +#include + +class PreparedStatement +{ +public: + PreparedStatement(uint32_t index) {}; +}; + +#endif \ No newline at end of file diff --git a/src/server/shared/Database/QueryResult.h b/src/server/shared/Database/QueryResult.h new file mode 100644 index 0000000..d85a23f --- /dev/null +++ b/src/server/shared/Database/QueryResult.h @@ -0,0 +1,21 @@ +#ifndef RESULTSET_H_ +#define RESULTSET_H_ + +#include "Field.h" +#include +#include + +class ResultSet +{ +public: + // Metadata + virtual uint64_t GetRowCount() = 0; + virtual uint32_t GetFieldCount() = 0; + + virtual bool NextRow() = 0; + virtual Field* Fetch() = 0; +}; + +typedef boost::shared_ptr QueryResult; + +#endif \ No newline at end of file diff --git a/src/server/shared/Logging/Log.cpp b/src/server/shared/Logging/Log.cpp index e8a3411..681fb70 100644 --- a/src/server/shared/Logging/Log.cpp +++ b/src/server/shared/Logging/Log.cpp @@ -72,6 +72,8 @@ void Log::OpenLogFile(std::string path) void Log::Write(LogLevel level, LogType type, std::string msg) { + boost::lock_guard lock(_mutex); + switch(level) { case LOG_LEVEL_ERROR: diff --git a/src/server/shared/Logging/Log.h b/src/server/shared/Logging/Log.h index ae3c7dd..cd3e6b4 100644 --- a/src/server/shared/Logging/Log.h +++ b/src/server/shared/Logging/Log.h @@ -7,6 +7,7 @@ #include #include #include +#include #define MAX_MSG_LEN 32*1024 #define ATTR_PRINTF(F, V) __attribute__ ((format (printf, F, V))) @@ -14,7 +15,8 @@ enum LogType { LOG_GENERAL = 0, - LOG_SERVER = 2, + LOG_SERVER = 1, + LOG_DATABASE = 2 }; enum LogLevel @@ -46,6 +48,8 @@ private: std::string logfileloc; std::ofstream logfile; + + boost::mutex _mutex; }; extern Log sLog; diff --git a/src/server/shared/Util.cpp b/src/server/shared/Util.cpp new file mode 100644 index 0000000..31c3e9d --- /dev/null +++ b/src/server/shared/Util.cpp @@ -0,0 +1,17 @@ +#include "Util.h" + +std::string Util::Date(const char* format, bool utc) +{ + boost::posix_time::ptime now; + if (utc) + now = boost::posix_time::second_clock::universal_time(); + else + now = boost::posix_time::second_clock::local_time(); + + std::stringstream ss; + boost::posix_time::time_facet *facet = new boost::posix_time::time_facet(format); + ss.imbue(std::locale(std::cout.getloc(), facet)); + ss << now; + + return ss.str(); +} \ No newline at end of file diff --git a/src/server/shared/Util.h b/src/server/shared/Util.h index 56317c9..38b9db6 100644 --- a/src/server/shared/Util.h +++ b/src/server/shared/Util.h @@ -3,25 +3,76 @@ #include #include +#include #include +#include +#include namespace Util { - std::string Date(const char* format, bool utc = false) - { - boost::posix_time::ptime now; - if (utc) - now = boost::posix_time::second_clock::universal_time(); - else - now = boost::posix_time::second_clock::local_time(); - - std::stringstream ss; - boost::posix_time::time_facet *facet = new boost::posix_time::time_facet(format); - ss.imbue(std::locale(std::cout.getloc(), facet)); - ss << now; - - return ss.str(); - } + std::string Date(const char* format, bool utc = false); + + template + class SynchronisedQueue + { + public: + SynchronisedQueue() : __endQueue(false) {} + + void Enqueue(const T& data) + { + boost::unique_lock lock(__mutex); + + __queue.push(data); + + __cond.notify_one(); + } + + T Dequeue() + { + boost::unique_lock lock(__mutex); + + while (__queue.empty() && !__endQueue) + __cond.wait(lock); + + if (__endQueue) + return NULL; + + T result = __queue.front(); + __queue.pop(); + + return result; + } + + void Stop() + { + __endQueue = true; + __cond.notify_all(); + } + + uint32_t Size() + { + boost::unique_lock lock(__mutex); + return __queue.size(); + } + + private: + bool __endQueue; + std::queue __queue; + boost::mutex __mutex; + boost::condition_variable __cond; + }; + + class GenericWorker + { + public: + void Activate() + { + _thread = new boost::thread(boost::bind(&GenericWorker::Work, this)); + } + private: + virtual void Work() = 0; + boost::thread* _thread; + }; } #endif