Browse Source

Some changes...

peercoin
Intel 11 years ago
parent
commit
ba40a82bdc
  1. 2
      src/server/poolserver/CMakeLists.txt
  2. 4
      src/server/poolserver/Database/ServerDatabaseEnv.h
  3. 25
      src/server/poolserver/Server/Server.cpp
  4. 11
      src/server/shared/Bitcoin/BitCoinRPC.h
  5. 7
      src/server/shared/CMakeLists.txt
  6. 17
      src/server/shared/Database/CMakeLists.txt
  7. 30
      src/server/shared/Database/Database.h
  8. 10
      src/server/shared/Database/DatabaseEnv.h
  9. 11
      src/server/shared/Database/Field.h
  10. 149
      src/server/shared/Database/MySQL/DatabaseConnectionMySQL.cpp
  11. 79
      src/server/shared/Database/MySQL/DatabaseConnectionMySQL.h
  12. 6
      src/server/shared/Database/MySQL/DatabaseEnvMySQL.h
  13. 26
      src/server/shared/Database/MySQL/DatabaseOperationMySQL.cpp
  14. 52
      src/server/shared/Database/MySQL/DatabaseOperationMySQL.h
  15. 36
      src/server/shared/Database/MySQL/DatabaseWorkerMySQL.cpp
  16. 19
      src/server/shared/Database/MySQL/DatabaseWorkerMySQL.h
  17. 158
      src/server/shared/Database/MySQL/DatabaseWorkerPoolMySQL.h
  18. 10
      src/server/shared/Database/MySQL/FieldMySQL.h
  19. 34
      src/server/shared/Database/MySQL/PreparedStatementMySQL.cpp
  20. 24
      src/server/shared/Database/MySQL/PreparedStatementMySQL.h
  21. 9
      src/server/shared/Database/MySQL/QueryResultMySQL.cpp
  22. 39
      src/server/shared/Database/MySQL/QueryResultMySQL.h
  23. 12
      src/server/shared/Database/PreparedStatement.h
  24. 21
      src/server/shared/Database/QueryResult.h
  25. 2
      src/server/shared/JSON/JSON.cpp
  26. 8
      src/server/shared/JSON/JSON.h
  27. 2
      src/server/shared/JSONRPC/JSONRPC.cpp
  28. 2
      src/server/shared/JSONRPC/JSONRPC.h
  29. 7
      src/server/shared/MySQL/DatabaseCallback.h
  30. 175
      src/server/shared/MySQL/DatabaseConnection.cpp
  31. 82
      src/server/shared/MySQL/DatabaseConnection.h
  32. 29
      src/server/shared/MySQL/DatabaseOperation.cpp
  33. 55
      src/server/shared/MySQL/DatabaseOperation.h
  34. 39
      src/server/shared/MySQL/DatabaseWorker.cpp
  35. 22
      src/server/shared/MySQL/DatabaseWorker.h
  36. 80
      src/server/shared/MySQL/DatabaseWorkerPool.cpp
  37. 116
      src/server/shared/MySQL/DatabaseWorkerPool.h
  38. 67
      src/server/shared/MySQL/Field.h
  39. 37
      src/server/shared/MySQL/PreparedStatement.cpp
  40. 33
      src/server/shared/MySQL/PreparedStatement.h
  41. 53
      src/server/shared/MySQL/QueryResult.cpp
  42. 49
      src/server/shared/MySQL/QueryResult.h

2
src/server/poolserver/CMakeLists.txt

@ -13,7 +13,7 @@ set(sources_Poolserver @@ -13,7 +13,7 @@ set(sources_Poolserver
include_directories(
${CMAKE_SOURCE_DIR}/src/server/shared
${CMAKE_SOURCE_DIR}/src/server/shared/Configuration
${CMAKE_SOURCE_DIR}/src/server/shared/Database
${CMAKE_SOURCE_DIR}/src/server/shared/MySQL
${CMAKE_SOURCE_DIR}/src/server/shared/Logging
${CMAKE_SOURCE_DIR}/src/server/shared/JSON
${CMAKE_SOURCE_DIR}/src/server/shared/JSONRPC

4
src/server/poolserver/Database/ServerDatabaseEnv.h

@ -1,9 +1,9 @@ @@ -1,9 +1,9 @@
#ifndef SERVER_DATABASE_ENV_H_
#define SERVER_DATABASE_ENV_H_
#include "DatabaseEnv.h"
#include "DatabaseWorkerPool.h"
class ServerDatabaseWorkerPoolMySQL : public DatabaseWorkerPoolMySQL
class ServerDatabaseWorkerPoolMySQL : public MySQL::DatabaseWorkerPool
{
};

25
src/server/poolserver/Server/Server.cpp

@ -17,14 +17,33 @@ Server::~Server() @@ -17,14 +17,33 @@ Server::~Server()
//delete stratumServer;
}
void AsyncQueryCallback(MySQL::QueryResult result)
{
sLog.Info(LOG_SERVER, "Metadata: F: %u R: %u", result->GetFieldCount(), result->GetRowCount());
while (result->NextRow()) {
MySQL::Field* fields = result->Fetch();
sLog.Info(LOG_SERVER, "Row: %i %s", fields[0].GetUInt32(),
fields[1].GetString().c_str());
}
}
int Server::Run()
{
sLog.Info(LOG_SERVER, "Server is starting...");
InitDatabase();
sDatabase.Execute("INSERT INTO `test_table` VALUES ('999', 'sync', '1.1')");
sDatabase.ExecuteAsync("INSERT INTO `test_table` VALUES ('999', 'sync', '1.1')");
//sDatabase.Execute("INSERT INTO `test_table` VALUES ('999', 'sync', '1.1')");
//sDatabase.ExecuteAsync("INSERT INTO `test_table` VALUES ('999', 'sync', '1.1')");
sDatabase.QueryAsync("SELECT * FROM `test_table`", &AsyncQueryCallback);
MySQL::QueryResult result = sDatabase.Query("SELECT * FROM `test_table`");
sLog.Info(LOG_SERVER, "Metadata: F: %u R: %u", result->GetFieldCount(), result->GetRowCount());
while (result->NextRow()) {
MySQL::Field* fields = result->Fetch();
sLog.Info(LOG_SERVER, "Row: %i %s", fields[0].GetUInt32(),
fields[1].GetString().c_str());
}
// Start stratum server
sLog.Info(LOG_SERVER, "Starting stratum");
@ -80,7 +99,7 @@ void Server::Update(uint32_t diff) @@ -80,7 +99,7 @@ void Server::Update(uint32_t diff)
bool Server::InitDatabase()
{
if (boost::iequals(sConfig.Get<std::string>("DatabaseDriver"), "mysql")) {
MySQLConnectionInfo connInfo;
MySQL::ConnectionInfo connInfo;
connInfo.Host = sConfig.Get<std::string>("MySQLHost");
connInfo.Port = sConfig.Get<uint16_t>("MySQLPort");
connInfo.User = sConfig.Get<std::string>("MySQLUser");

11
src/server/shared/Bitcoin/BitCoinRPC.h

@ -0,0 +1,11 @@ @@ -0,0 +1,11 @@
#ifndef BITCOINRPC_H_
#define BITCOINRPC_H_
#include "JSONRPC.h"
class BitCoinRPC: public JSONRPC
{
};
#endif

7
src/server/shared/CMakeLists.txt

@ -1,7 +1,6 @@ @@ -1,7 +1,6 @@
add_subdirectory(Database)
file(GLOB_RECURSE sources_Configuration Configuration/*.cpp Configuration/*.h)
file(GLOB_RECURSE sources_MySQL MySQL/*.cpp MySQL/*.h)
file(GLOB_RECURSE sources_Logging Logging/*.cpp Logging/*.h)
file(GLOB_RECURSE sources_Stratum Stratum/*.cpp Stratum/*.h)
file(GLOB_RECURSE sources_JSON JSON/*.cpp JSON/*.h)
@ -11,7 +10,7 @@ file(GLOB sources_localdir *.cpp *.h) @@ -11,7 +10,7 @@ file(GLOB sources_localdir *.cpp *.h)
set(sources_Shared
${sources_Configuration}
${sources_Database}
${sources_MySQL}
${sources_Logging}
${sources_Stratum}
${sources_JSON}
@ -22,7 +21,7 @@ set(sources_Shared @@ -22,7 +21,7 @@ set(sources_Shared
include_directories(
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/Configuration
${CMAKE_CURRENT_SOURCE_DIR}/Database
${CMAKE_CURRENT_SOURCE_DIR}/MySQL
${CMAKE_CURRENT_SOURCE_DIR}/Logging
${CMAKE_CURRENT_SOURCE_DIR}/JSON
${CMAKE_CURRENT_SOURCE_DIR}/JSONRPC

17
src/server/shared/Database/CMakeLists.txt

@ -1,17 +0,0 @@ @@ -1,17 +0,0 @@
file(GLOB_RECURSE sources_MySQL MySQL/*.cpp MySQL/*.h)
file(GLOB sources_localdir *.cpp *.h)
set(sources_Database
${sources_localdir}
PARENT_SCOPE
)
if(MYSQL)
set(sources_Database
${sources_Database}
${sources_MySQL}
${MYSQL_INCLUDE_DIR}
PARENT_SCOPE
)
endif()

30
src/server/shared/Database/Database.h

@ -1,30 +0,0 @@ @@ -1,30 +0,0 @@
#ifndef DATABASE_H_
#define DATABASE_H_
#include "QueryResult.h"
#include "PreparedStatement.h"
#include "DatabaseCallback.h"
#include <boost/thread.hpp>
class Database
{
public:
// Queries
virtual bool Execute(const char* query) = 0;
virtual ResultSet* Query(const char* query) = 0;
// Stmt
virtual bool Execute(PreparedStatement* stmt) = 0;
virtual ResultSet* Query(PreparedStatement* stmt) = 0;
// Async
virtual bool ExecuteAsync(const char* query) = 0;
virtual bool ExecuteAsync(PreparedStatement* stmt) = 0;
virtual bool QueryAsync(DatabaseCallback callback, const char* query) = 0;
virtual bool QueryAsync(DatabaseCallback callback, PreparedStatement* stmt) = 0;
// Prepared Statements
virtual PreparedStatement* GetPreparedStatement(uint32_t index) = 0;
};
#endif

10
src/server/shared/Database/DatabaseEnv.h

@ -1,10 +0,0 @@ @@ -1,10 +0,0 @@
#ifndef DATABASE_ENV_H_
#define DATABASE_ENV_H_
#include "Database.h"
#ifdef WITH_MYSQL
#include "MySQL/DatabaseEnvMySQL.h"
#endif
#endif

11
src/server/shared/Database/Field.h

@ -1,11 +0,0 @@ @@ -1,11 +0,0 @@
#ifndef FIELD_H_
#define FIELD_H_
class Field
{
public:
//template<class T>
//virtual Get();
};
#endif

149
src/server/shared/Database/MySQL/DatabaseConnectionMySQL.cpp

@ -1,149 +0,0 @@ @@ -1,149 +0,0 @@
#include "DatabaseConnectionMySQL.h"
#include "Log.h"
DatabaseConnectionMySQL::DatabaseConnectionMySQL(MySQLConnectionInfo connInfo, DatabaseWorkQueueMySQL* asyncQueue) :
_mysql(NULL), _asyncQueue(asyncQueue), _worker(NULL), _connectionInfo(connInfo)
{
if (_asyncQueue) {
_worker = new DatabaseWorkerMySQL(_asyncQueue, this);
Type = MYSQL_CONN_ASYNC;
} else
Type = MYSQL_CONN_SYNC;
}
DatabaseConnectionMySQL::~DatabaseConnectionMySQL()
{
assert(_mysql);
for (uint32_t i = 0; i < _stmts.size(); ++i)
delete _stmts[i];
mysql_close(_mysql);
}
bool DatabaseConnectionMySQL::Open()
{
MYSQL* mysqlInit;
mysqlInit = mysql_init(NULL);
if (!mysqlInit)
{
sLog.Error(LOG_DATABASE, "Could not initialize Mysql connection to database `%s`", _connectionInfo.DB.c_str());
return false;
}
mysql_options(mysqlInit, MYSQL_SET_CHARSET_NAME, "utf8");
_mysql = mysql_real_connect(mysqlInit, _connectionInfo.Host.c_str(), _connectionInfo.User.c_str(),
_connectionInfo.Pass.c_str(), _connectionInfo.DB.c_str(), _connectionInfo.Port, NULL, 0);
if (_mysql)
{
sLog.Info(LOG_DATABASE, "Connected to MySQL database at %s", _connectionInfo.Host.c_str());
mysql_autocommit(_mysql, 1);
// set connection properties to UTF8 to properly handle locales for different
// server configs - core sends data in UTF8, so MySQL must expect UTF8 too
mysql_set_character_set(_mysql, "utf8");
return true;
}
else
{
sLog.Error(LOG_DATABASE, "Could not connect to MySQL database at %s: %s\n", _connectionInfo.Host.c_str(), mysql_error(mysqlInit));
mysql_close(mysqlInit);
return false;
}
}
void DatabaseConnectionMySQL::Close()
{
delete this;
}
bool DatabaseConnectionMySQL::Execute(const char* query)
{
sLog.Debug(LOG_DATABASE, "DatabaseConnectionMySQL::Execute(): %s", query);
if (!query || !_mysql)
return false;
if (mysql_query(_mysql, query))
{
uint32_t lErrno = mysql_errno(_mysql);
sLog.Error(LOG_DATABASE, "[%u] %s", lErrno, mysql_error(_mysql));
if (_HandleMySQLErrno(lErrno)) // If it returns true, an error was handled successfully (i.e. reconnection)
return Execute(query); // Try again
return false;
}
else
return true;
}
ResultSetMySQL* DatabaseConnectionMySQL::Query(const char* query)
{
sLog.Debug(LOG_DATABASE, "DatabaseConnectionMySQL::Query(): %s", query);
if (!query)
return NULL;
MYSQL_RES *result = NULL;
MYSQL_FIELD *fields = NULL;
uint64_t rowCount = 0;
uint32_t fieldCount = 0;
if (!_Query(query, result, fields, rowCount, fieldCount))
return NULL;
return new ResultSetMySQL(result, fields, rowCount, fieldCount);
}
bool DatabaseConnectionMySQL::_Query(const char *query, MYSQL_RES* result, MYSQL_FIELD* fields, uint64_t& rowCount, uint32_t& fieldCount)
{
if (!_mysql)
return false;
if (mysql_query(_mysql, query))
{
uint32_t lErrno = mysql_errno(_mysql);
sLog.Error(LOG_DATABASE, "[%u] %s", lErrno, mysql_error(_mysql));
if (_HandleMySQLErrno(lErrno)) // If it returns true, an error was handled successfully (i.e. reconnection)
return _Query(query, result, fields, rowCount, fieldCount); // We try again
return false;
}
result = mysql_store_result(_mysql);
rowCount = mysql_affected_rows(_mysql);
fieldCount = mysql_field_count(_mysql);
if (!result)
return false;
if (!rowCount)
{
mysql_free_result(result);
return false;
}
fields = mysql_fetch_fields(result);
return true;
}
bool DatabaseConnectionMySQL::Execute(PreparedStatement* stmt)
{
}
ResultSetMySQL* DatabaseConnectionMySQL::Query(PreparedStatement* stmt)
{
}
bool DatabaseConnectionMySQL::_HandleMySQLErrno(uint32_t lErrno)
{
return false;
}

79
src/server/shared/Database/MySQL/DatabaseConnectionMySQL.h

@ -1,79 +0,0 @@ @@ -1,79 +0,0 @@
#ifndef DATABASE_CONNECTION_MYSQL_H_
#define DATABASE_CONNECTION_MYSQL_H_
#include "DatabaseOperationMySQL.h"
#include "DatabaseWorkerMySQL.h"
#include "QueryResultMySQL.h"
#include "PreparedStatementMySQL.h"
#include <boost/thread.hpp>
#include <mysql.h>
enum MySQLConnectionType
{
MYSQL_CONN_SYNC,
MYSQL_CONN_ASYNC,
MYSQL_CONN_SIZE
};
struct MySQLConnectionInfo
{
std::string Host;
uint16_t Port;
std::string User;
std::string Pass;
std::string DB;
};
class DatabaseConnectionMySQL
{
public:
DatabaseConnectionMySQL(MySQLConnectionInfo connInfo, DatabaseWorkQueueMySQL* asyncQueue = NULL);
~DatabaseConnectionMySQL();
bool Open();
void Close();
// Ping!
void Ping()
{
mysql_ping(_mysql);
}
// Queries
bool Execute(const char* query);
ResultSetMySQL* Query(const char* query);
// Stmt
bool Execute(PreparedStatement* stmt);
ResultSetMySQL* Query(PreparedStatement* stmt);
// Locking
bool LockIfReady()
{
return _mutex.try_lock();
}
void Unlock()
{
_mutex.unlock();
}
MySQLConnectionType Type;
void PrepareStatement(uint32_t index, const char* query);
private:
bool _Query(const char *sql, MYSQL_RES* result, MYSQL_FIELD* fields, uint64_t& pRowCount, uint32_t& pFieldCount);
bool _HandleMySQLErrno(uint32_t lErrno);
boost::mutex _mutex;
MYSQL* _mysql;
DatabaseWorkQueueMySQL* _asyncQueue;
DatabaseWorkerMySQL* _worker;
std::vector<PreparedStatementMySQL*> _stmts;
MySQLConnectionInfo _connectionInfo;
};
#endif

6
src/server/shared/Database/MySQL/DatabaseEnvMySQL.h

@ -1,6 +0,0 @@ @@ -1,6 +0,0 @@
#ifndef DATABASE_ENV_MYSQL_H_
#define DATABASE_ENV_MYSQL_H_
#include "DatabaseWorkerPoolMySQL.h"
#endif

26
src/server/shared/Database/MySQL/DatabaseOperationMySQL.cpp

@ -1,26 +0,0 @@ @@ -1,26 +0,0 @@
#include "DatabaseOperationMySQL.h"
#include "DatabaseConnectionMySQL.h"
void DatabasePingOperationMySQL::Execute()
{
_conn->Ping();
}
void DatabasePreparedStatementOperationMySQL::Execute()
{
if (_callback) {
ResultSetMySQL* result = _conn->Query(_stmt);
_callback(result);
} else
_conn->Execute(_stmt);
}
void DatabaseQueryOperationMySQL::Execute()
{
if (_callback) {
ResultSetMySQL* result = _conn->Query(_query);
_callback(result);
} else
_conn->Execute(_query);
}

52
src/server/shared/Database/MySQL/DatabaseOperationMySQL.h

@ -1,52 +0,0 @@ @@ -1,52 +0,0 @@
#ifndef DATABASE_OPERATION_MYSQL_H_
#define DATABASE_OPERATION_MYSQL_H_
#include "Util.h"
#include "DatabaseCallback.h"
#include "PreparedStatement.h"
#include <boost/bind.hpp>
class DatabaseConnectionMySQL;
class DatabaseOperationMySQL
{
public:
DatabaseOperationMySQL(): _conn(NULL) {}
virtual void Execute() = 0;
void SetConnection(DatabaseConnectionMySQL* conn) { _conn = conn; }
protected:
DatabaseConnectionMySQL* _conn;
};
class DatabasePingOperationMySQL : public DatabaseOperationMySQL
{
void Execute();
};
class DatabasePreparedStatementOperationMySQL : public DatabaseOperationMySQL
{
public:
DatabasePreparedStatementOperationMySQL(PreparedStatement* stmt, DatabaseCallback callback = NULL): DatabaseOperationMySQL(), _stmt(stmt), _callback(callback) {};
void Execute();
private:
DatabaseCallback _callback;
PreparedStatement* _stmt;
};
class DatabaseQueryOperationMySQL : public DatabaseOperationMySQL
{
public:
DatabaseQueryOperationMySQL(const char* query, DatabaseCallback callback = NULL): DatabaseOperationMySQL(), _query(query), _callback(callback) {};
void Execute();
private:
DatabaseCallback _callback;
const char* _query;
};
typedef Util::SynchronisedQueue<DatabaseOperationMySQL*> DatabaseWorkQueueMySQL;
#endif

36
src/server/shared/Database/MySQL/DatabaseWorkerMySQL.cpp

@ -1,36 +0,0 @@ @@ -1,36 +0,0 @@
#include "DatabaseWorkerMySQL.h"
#include "Log.h"
DatabaseWorkerMySQL::DatabaseWorkerMySQL(DatabaseWorkQueueMySQL* asyncQueue, DatabaseConnectionMySQL* conn) :
_asyncQueue(asyncQueue), _conn(conn)
{
Activate();
}
void DatabaseWorkerMySQL::Work()
{
sLog.Debug(LOG_DATABASE, "Database worker thread started");
if (!this->_asyncQueue)
return;
DatabaseOperationMySQL* op = NULL;
for (;;)
{
op = this->_asyncQueue->Dequeue();
if (!op) {
sLog.Debug(LOG_DATABASE, "Database worker thread exiting...");
return;
}
sLog.Debug(LOG_DATABASE, "Database worker working...");
op->SetConnection(_conn);
op->Execute();
delete op;
sLog.Debug(LOG_DATABASE, "Database worker finished!");
}
}

19
src/server/shared/Database/MySQL/DatabaseWorkerMySQL.h

@ -1,19 +0,0 @@ @@ -1,19 +0,0 @@
#ifndef DATABASE_WORKER_MYSQL_H_
#define DATABASE_WORKER_MYSQL_H_
#include "Util.h"
#include "DatabaseOperationMySQL.h"
class DatabaseConnectionMySQL;
class DatabaseWorkerMySQL : public Util::GenericWorker
{
public:
DatabaseWorkerMySQL(DatabaseWorkQueueMySQL* asyncQueue, DatabaseConnectionMySQL* conn);
void Work();
private:
DatabaseWorkQueueMySQL* _asyncQueue;
DatabaseConnectionMySQL* _conn;
};
#endif

158
src/server/shared/Database/MySQL/DatabaseWorkerPoolMySQL.h

@ -1,158 +0,0 @@ @@ -1,158 +0,0 @@
#ifndef DATABASE_WORKER_POOL_MYSQL_H_
#define DATABASE_WORKER_POOL_MYSQL_H_
#include "Database.h"
#include "DatabaseConnectionMySQL.h"
#include "QueryResultMySQL.h"
#include "Log.h"
#include <vector>
class DatabaseWorkerPoolMySQL : public Database
{
public:
DatabaseWorkerPoolMySQL() : _asyncQueue(new DatabaseWorkQueueMySQL()) { }
~DatabaseWorkerPoolMySQL()
{
delete _asyncQueue;
}
bool Open(MySQLConnectionInfo connInfo, uint8_t syncThreads, uint8_t asyncThreads)
{
bool res = true;
_connectionInfo = connInfo;
sLog.Info(LOG_DATABASE, "Opening MySQL Database Pool '%s'. Asynchronous threads: %u, synchronous threads: %u.", _connectionInfo.DB.c_str(), asyncThreads, syncThreads);
for (uint8_t i = 0; i < syncThreads; ++i)
{
DatabaseConnectionMySQL* conn = new DatabaseConnectionMySQL(_connectionInfo);
res &= conn->Open();
_connections[MYSQL_CONN_SYNC].push_back(conn);
}
for (uint8_t i = 0; i < syncThreads; ++i)
{
DatabaseConnectionMySQL* conn = new DatabaseConnectionMySQL(_connectionInfo, _asyncQueue);
res &= conn->Open();
_connections[MYSQL_CONN_ASYNC].push_back(conn);
}
if (res)
sLog.Info(LOG_DATABASE, "MySQL Database Pool '%s' opened successfully. %u total connections.", _connectionInfo.DB.c_str(), _connections[MYSQL_CONN_SYNC].size()+_connections[MYSQL_CONN_ASYNC].size());
else
sLog.Error(LOG_DATABASE, "Failed opening MySQL Database Pool to '%s'.", _connectionInfo.DB.c_str());
return res;
}
void Close()
{
sLog.Info(LOG_DATABASE, "Closing MySQL Database Pool '%s'.", _connectionInfo.DB.c_str());
// Stop worker threads
_asyncQueue->Stop();
for (uint8_t i = 0; i < _connections[MYSQL_CONN_SYNC].size(); ++i)
{
DatabaseConnectionMySQL* conn = _connections[MYSQL_CONN_SYNC][i];
conn->Close();
}
for (uint8_t i = 0; i < _connections[MYSQL_CONN_ASYNC].size(); ++i)
{
DatabaseConnectionMySQL* conn = _connections[MYSQL_CONN_ASYNC][i];
conn->Close();
}
sLog.Info(LOG_DATABASE, "Closed all connections to MySQL Database Pool '%s'.", _connectionInfo.DB.c_str());
}
// Queries
bool Execute(const char* query)
{
DatabaseConnectionMySQL* conn = GetSyncConnection();
bool result = conn->Execute(query);
conn->Unlock();
return result;
}
ResultSetMySQL* Query(const char* query)
{
DatabaseConnectionMySQL* conn = GetSyncConnection();
ResultSetMySQL* result = conn->Query(query);
conn->Unlock();
return result;
}
// Stmt
bool Execute(PreparedStatement* stmt)
{
DatabaseConnectionMySQL* conn = GetSyncConnection();
bool result = conn->Execute(stmt);
conn->Unlock();
return result;
}
ResultSetMySQL* Query(PreparedStatement* stmt)
{
DatabaseConnectionMySQL* conn = GetSyncConnection();
ResultSetMySQL* result = conn->Query(stmt);
conn->Unlock();
return result;
}
// Async
bool ExecuteAsync(const char* query)
{
DatabaseQueryOperationMySQL* op = new DatabaseQueryOperationMySQL(query);
_asyncQueue->Enqueue(op);
return true;
}
bool ExecuteAsync(PreparedStatement* stmt)
{
DatabasePreparedStatementOperationMySQL* op = new DatabasePreparedStatementOperationMySQL(stmt);
_asyncQueue->Enqueue(op);
return true;
}
bool QueryAsync(DatabaseCallback callback, const char* query)
{
DatabaseQueryOperationMySQL* op = new DatabaseQueryOperationMySQL(query, callback);
_asyncQueue->Enqueue(op);
return true;
}
bool QueryAsync(DatabaseCallback callback, PreparedStatement* stmt)
{
DatabasePreparedStatementOperationMySQL* op = new DatabasePreparedStatementOperationMySQL(stmt, callback);
_asyncQueue->Enqueue(op);
return true;
}
// Prepared Statements
PreparedStatement* GetPreparedStatement(uint32_t stmtid)
{
return NULL;//new PreparedStatement(stmtid);
}
private:
DatabaseConnectionMySQL* GetSyncConnection()
{
uint32_t i;
uint8_t conn_size = _connections[MYSQL_CONN_SYNC].size();
DatabaseConnectionMySQL* conn = NULL;
// Block until we find a free connection
for (;;)
{
conn = _connections[MYSQL_CONN_SYNC][++i % conn_size];
if (conn->LockIfReady())
break;
}
return conn;
}
std::vector<DatabaseConnectionMySQL*> _connections[MYSQL_CONN_SIZE];
MySQLConnectionInfo _connectionInfo;
DatabaseWorkQueueMySQL* _asyncQueue;
};
#endif

10
src/server/shared/Database/MySQL/FieldMySQL.h

@ -1,10 +0,0 @@ @@ -1,10 +0,0 @@
#ifndef FIELD_MYSQL_H_
#define FIELD_MYSQL_H_
#include "Field.h"
class FieldMySQL : public Field
{
};
#endif

34
src/server/shared/Database/MySQL/PreparedStatementMySQL.cpp

@ -1,34 +0,0 @@ @@ -1,34 +0,0 @@
#include "PreparedStatementMySQL.h"
#include <cstring>
PreparedStatementMySQL::PreparedStatementMySQL(MYSQL_STMT* stmt) :
_stmt(stmt), _bind(NULL), PreparedStatement(0)
{
_paramCount = mysql_stmt_param_count(stmt);
_bind = new MYSQL_BIND[_paramCount];
memset(_bind, 0, sizeof(MYSQL_BIND)*_paramCount);
}
PreparedStatementMySQL::~PreparedStatementMySQL()
{
ClearParameters();
if (_stmt->bind_result_done)
{
delete[] _stmt->bind->length;
delete[] _stmt->bind->is_null;
}
mysql_stmt_close(_stmt);
delete[] _bind;
this->~PreparedStatement();
}
void PreparedStatementMySQL::ClearParameters()
{
for (uint8_t i = 0; i < _paramCount; ++i)
{
delete _bind[i].length;
_bind[i].length = NULL;
delete[] (char*) _bind[i].buffer;
_bind[i].buffer = NULL;
}
}

24
src/server/shared/Database/MySQL/PreparedStatementMySQL.h

@ -1,24 +0,0 @@ @@ -1,24 +0,0 @@
#ifndef PREPARED_STATEMENT_MYSQL_H_
#define PREPARED_STATEMENT_MYSQL_H_
#include "PreparedStatement.h"
#include <boost/cstdint.hpp>
#include <mysql.h>
class PreparedStatementMySQL : public PreparedStatement
{
public:
PreparedStatementMySQL(MYSQL_STMT* stmt);
~PreparedStatementMySQL();
template<class T>
void Set(const uint8_t index, const T value);
void ClearParameters();
private:
MYSQL_STMT* _stmt;
MYSQL_BIND* _bind;
uint8_t _paramCount;
};
#endif

9
src/server/shared/Database/MySQL/QueryResultMySQL.cpp

@ -1,9 +0,0 @@ @@ -1,9 +0,0 @@
#include "QueryResultMySQL.h"
ResultSetMySQL::ResultSetMySQL(MYSQL_RES* result, MYSQL_FIELD* fields, uint64_t rowCount, uint32_t fieldCount)
{
}
ResultSetMySQL::~ResultSetMySQL()
{
}

39
src/server/shared/Database/MySQL/QueryResultMySQL.h

@ -1,39 +0,0 @@ @@ -1,39 +0,0 @@
#ifndef QUERY_RESULT_MYSQL_H_
#define QUERY_RESULT_MYSQL_H_
#include "QueryResult.h"
#include "FieldMySQL.h"
#include <boost/shared_ptr.hpp>
#include <cassert>
#include <mysql.h>
class ResultSetMySQL : public ResultSet
{
public:
ResultSetMySQL(MYSQL_RES* result, MYSQL_FIELD* fields, uint64_t rowCount, uint32_t fieldCount);
~ResultSetMySQL();
// Metadata
uint64_t GetRowCount()
{
return _rowCount;
}
uint32_t GetFieldCount()
{
return _fieldCount;
}
bool NextRow() {};
Field* Fetch() {};
private:
uint64_t _rowCount;
FieldMySQL* _currentRow;
uint32_t _fieldCount;
void CleanUp();
MYSQL_RES* _result;
MYSQL_FIELD* _fields;
};
#endif

12
src/server/shared/Database/PreparedStatement.h

@ -1,12 +0,0 @@ @@ -1,12 +0,0 @@
#ifndef PREPARED_STATEMENT_H_
#define PREPARED_STATEMENT_H_
#include <boost/cstdint.hpp>
class PreparedStatement
{
public:
PreparedStatement(uint32_t index) {};
};
#endif

21
src/server/shared/Database/QueryResult.h

@ -1,21 +0,0 @@ @@ -1,21 +0,0 @@
#ifndef RESULTSET_H_
#define RESULTSET_H_
#include "Field.h"
#include <boost/shared_ptr.hpp>
#include <boost/cstdint.hpp>
class ResultSet
{
public:
// Metadata
virtual uint64_t GetRowCount() = 0;
virtual uint32_t GetFieldCount() = 0;
virtual bool NextRow() = 0;
virtual Field* Fetch() = 0;
};
typedef boost::shared_ptr<ResultSet> QueryResult;
#endif

2
src/server/shared/JSON/JSON.cpp

@ -19,4 +19,4 @@ std::string JSON::ToString() @@ -19,4 +19,4 @@ std::string JSON::ToString()
std::stringstream ss;
boost::property_tree::json_parser::write_json(ss, *pt);
return ss.str();
}
}

8
src/server/shared/JSON/JSON.h

@ -77,4 +77,10 @@ inline void JSON::Set<JSON>(std::string key, JSON value) @@ -77,4 +77,10 @@ inline void JSON::Set<JSON>(std::string key, JSON value)
pt->put_child(key, *value.pt);
}
#endif
template<>
inline void JSON::Add<JSON>(JSON value)
{
pt->push_back(std::make_pair("", *value.pt));
}
#endif

2
src/server/shared/JSONRPC/JSONRPC.cpp

@ -124,4 +124,4 @@ JSON JSONRPC::Query(std::string method, JSON params) @@ -124,4 +124,4 @@ JSON JSONRPC::Query(std::string method, JSON params)
sLog.Debug(LOG_JSONRPC, "JSONRPC::Query(): JSON Response: %s", jsonresponse.c_str());
return JSON::FromString(jsonresponse);
}
}

2
src/server/shared/JSONRPC/JSONRPC.h

@ -31,4 +31,4 @@ private: @@ -31,4 +31,4 @@ private:
boost::asio::ip::tcp::endpoint _ep;
};
#endif
#endif

7
src/server/shared/Database/DatabaseCallback.h → src/server/shared/MySQL/DatabaseCallback.h

@ -5,6 +5,9 @@ @@ -5,6 +5,9 @@
#include <boost/function.hpp>
typedef boost::function<void(ResultSet*)> DatabaseCallback;
namespace MySQL
{
typedef boost::function<void(QueryResult)> DatabaseCallback;
}
#endif
#endif

175
src/server/shared/MySQL/DatabaseConnection.cpp

@ -0,0 +1,175 @@ @@ -0,0 +1,175 @@
#include "DatabaseConnection.h"
#include "Log.h"
namespace MySQL
{
DatabaseConnection::DatabaseConnection(ConnectionInfo connInfo, DatabaseWorkQueue* asyncQueue) :
_mysql(NULL), _asyncQueue(asyncQueue), _worker(NULL), _connectionInfo(connInfo)
{
if (_asyncQueue) {
_worker = new DatabaseWorker(_asyncQueue, this);
Type = MYSQL_CONN_ASYNC;
} else
Type = MYSQL_CONN_SYNC;
}
DatabaseConnection::~DatabaseConnection()
{
assert(_mysql);
for (uint32_t i = 0; i < _stmts.size(); ++i)
delete _stmts[i];
mysql_close(_mysql);
}
bool DatabaseConnection::Open()
{
MYSQL* mysqlInit;
mysqlInit = mysql_init(NULL);
if (!mysqlInit)
{
sLog.Error(LOG_DATABASE, "Could not initialize Mysql connection to database `%s`", _connectionInfo.DB.c_str());
return false;
}
mysql_options(mysqlInit, MYSQL_SET_CHARSET_NAME, "utf8");
_mysql = mysql_real_connect(mysqlInit, _connectionInfo.Host.c_str(), _connectionInfo.User.c_str(),
_connectionInfo.Pass.c_str(), _connectionInfo.DB.c_str(), _connectionInfo.Port, NULL, 0);
if (_mysql)
{
sLog.Info(LOG_DATABASE, "Connected to MySQL database at %s", _connectionInfo.Host.c_str());
mysql_autocommit(_mysql, 1);
// set connection properties to UTF8 to properly handle locales for different
// server configs - core sends data in UTF8, so MySQL must expect UTF8 too
mysql_set_character_set(_mysql, "utf8");
return true;
}
else
{
sLog.Error(LOG_DATABASE, "Could not connect to MySQL database at %s: %s\n", _connectionInfo.Host.c_str(), mysql_error(mysqlInit));
mysql_close(mysqlInit);
return false;
}
}
void DatabaseConnection::Close()
{
delete this;
}
bool DatabaseConnection::Execute(const char* query)
{
sLog.Debug(LOG_DATABASE, "DatabaseConnectionMySQL::Execute(): %s", query);
if (!query || !_mysql)
return false;
if (mysql_query(_mysql, query))
{
uint32_t lErrno = mysql_errno(_mysql);
sLog.Error(LOG_DATABASE, "[%u] %s", lErrno, mysql_error(_mysql));
if (_HandleMySQLErrno(lErrno)) // If it returns true, an error was handled successfully (i.e. reconnection)
return Execute(query); // Try again
return false;
}
else
return true;
}
ResultSet* DatabaseConnection::Query(const char* query)
{
sLog.Debug(LOG_DATABASE, "DatabaseConnectionMySQL::Query(): %s", query);
if (!query)
return NULL;
MYSQL_RES *result = NULL;
MYSQL_FIELD *fields = NULL;
uint64_t rowCount = 0;
uint32_t fieldCount = 0;
if (!_Query(query, &result, &fields, rowCount, fieldCount))
return NULL;
return new ResultSet(result, fields, rowCount, fieldCount);
}
bool DatabaseConnection::_Query(const char *query, MYSQL_RES** result, MYSQL_FIELD** fields, uint64_t& rowCount, uint32_t& fieldCount)
{
if (!_mysql)
return false;
if (mysql_query(_mysql, query))
{
uint32_t lErrno = mysql_errno(_mysql);
sLog.Error(LOG_DATABASE, "[%u] %s", lErrno, mysql_error(_mysql));
if (_HandleMySQLErrno(lErrno)) // If it returns true, an error was handled successfully (i.e. reconnection)
return _Query(query, result, fields, rowCount, fieldCount); // We try again
return false;
}
*result = mysql_store_result(_mysql);
rowCount = mysql_affected_rows(_mysql);
fieldCount = mysql_field_count(_mysql);
if (!result)
return false;
if (!rowCount)
{
mysql_free_result(*result);
return false;
}
*fields = mysql_fetch_fields(*result);
return true;
}
bool DatabaseConnection::Execute(PreparedStatement* stmt)
{
}
ResultSet* DatabaseConnection::Query(PreparedStatement* stmt)
{
}
void DatabaseConnection::PrepareStatement(uint32 index, const char* sql)
{
// For reconnection case
//if (m_reconnecting)
// delete m_stmts[index];
MYSQL_STMT* stmt = mysql_stmt_init(_mysql);
if (!stmt) {
sLog.Error(LOG_DATABASE, "In mysql_stmt_init() id: %u, sql: \"%s\"", index, sql);
sLog.Error(LOG_DATABASE, "%s", mysql_error(_mysql));
} else {
if (mysql_stmt_prepare(stmt, sql, strlen(sql))) {
sLog.Error(LOG_DATABASE, "In mysql_stmt_init() id: %u, sql: \"%s\"", index, sql);
sLog.Error(LOG_DATABASE, "%s", mysql_stmt_error(stmt));
mysql_stmt_close(stmt);
} else {
PreparedStatement* mStmt = new PreparedStatement(stmt);
_stmts[index] = mStmt;
}
}
}
bool DatabaseConnection::_HandleMySQLErrno(uint32_t lErrno)
{
return false;
}
}

82
src/server/shared/MySQL/DatabaseConnection.h

@ -0,0 +1,82 @@ @@ -0,0 +1,82 @@
#ifndef DATABASE_CONNECTION_MYSQL_H_
#define DATABASE_CONNECTION_MYSQL_H_
#include "DatabaseOperation.h"
#include "DatabaseWorker.h"
#include "QueryResult.h"
#include "PreparedStatement.h"
#include <boost/thread.hpp>
#include <mysql.h>
namespace MySQL
{
enum ConnectionType
{
MYSQL_CONN_SYNC,
MYSQL_CONN_ASYNC,
MYSQL_CONN_SIZE
};
struct ConnectionInfo
{
std::string Host;
uint16_t Port;
std::string User;
std::string Pass;
std::string DB;
};
class DatabaseConnection
{
public:
DatabaseConnection(ConnectionInfo connInfo, DatabaseWorkQueue* asyncQueue = NULL);
~DatabaseConnection();
bool Open();
void Close();
// Ping!
void Ping()
{
mysql_ping(_mysql);
}
// Queries
bool Execute(const char* query);
ResultSet* Query(const char* query);
// Stmt
bool Execute(PreparedStatement* stmt);
ResultSet* Query(PreparedStatement* stmt);
// Locking
bool LockIfReady()
{
return _mutex.try_lock();
}
void Unlock()
{
_mutex.unlock();
}
ConnectionType Type;
void PrepareStatement(uint32_t index, const char* sql);
private:
bool _Query(const char *sql, MYSQL_RES** result, MYSQL_FIELD** fields, uint64_t& pRowCount, uint32_t& pFieldCount);
bool _HandleMySQLErrno(uint32_t lErrno);
boost::mutex _mutex;
MYSQL* _mysql;
DatabaseWorkQueue* _asyncQueue;
DatabaseWorker* _worker;
std::vector<PreparedStatement*> _stmts;
ConnectionInfo _connectionInfo;
};
}
#endif

29
src/server/shared/MySQL/DatabaseOperation.cpp

@ -0,0 +1,29 @@ @@ -0,0 +1,29 @@
#include "DatabaseOperation.h"
#include "DatabaseConnection.h"
namespace MySQL
{
void DatabasePingOperation::Execute()
{
_conn->Ping();
}
void DatabasePreparedStatementOperation::Execute()
{
if (_callback) {
ResultSet* result = _conn->Query(_stmt);
_callback(QueryResult(result));
} else
_conn->Execute(_stmt);
}
void DatabaseQueryOperation::Execute()
{
if (_callback) {
ResultSet* result = _conn->Query(_query);
_callback(QueryResult(result));
} else
_conn->Execute(_query);
}
}

55
src/server/shared/MySQL/DatabaseOperation.h

@ -0,0 +1,55 @@ @@ -0,0 +1,55 @@
#ifndef DATABASE_OPERATION_MYSQL_H_
#define DATABASE_OPERATION_MYSQL_H_
#include "Util.h"
#include "DatabaseCallback.h"
#include "PreparedStatement.h"
#include <boost/bind.hpp>
namespace MySQL
{
class DatabaseConnection;
class DatabaseOperation
{
public:
DatabaseOperation(): _conn(NULL) {}
virtual void Execute() = 0;
void SetConnection(DatabaseConnection* conn) { _conn = conn; }
protected:
DatabaseConnection* _conn;
};
class DatabasePingOperation : public DatabaseOperation
{
void Execute();
};
class DatabasePreparedStatementOperation : public DatabaseOperation
{
public:
DatabasePreparedStatementOperation(PreparedStatement* stmt, DatabaseCallback callback = NULL): DatabaseOperation(), _stmt(stmt), _callback(callback) {};
void Execute();
private:
DatabaseCallback _callback;
PreparedStatement* _stmt;
};
class DatabaseQueryOperation : public DatabaseOperation
{
public:
DatabaseQueryOperation(const char* query, DatabaseCallback callback = NULL): DatabaseOperation(), _query(query), _callback(callback) {};
void Execute();
private:
DatabaseCallback _callback;
const char* _query;
};
typedef Util::SynchronisedQueue<DatabaseOperation*> DatabaseWorkQueue;
}
#endif

39
src/server/shared/MySQL/DatabaseWorker.cpp

@ -0,0 +1,39 @@ @@ -0,0 +1,39 @@
#include "DatabaseWorker.h"
#include "Log.h"
namespace MySQL
{
DatabaseWorker::DatabaseWorker(DatabaseWorkQueue* asyncQueue, DatabaseConnection* conn) :
_asyncQueue(asyncQueue), _conn(conn)
{
Activate();
}
void DatabaseWorker::Work()
{
sLog.Debug(LOG_DATABASE, "Database worker thread started");
if (!this->_asyncQueue)
return;
DatabaseOperation* op = NULL;
for (;;)
{
op = this->_asyncQueue->Dequeue();
if (!op) {
sLog.Debug(LOG_DATABASE, "Database worker thread exiting...");
return;
}
sLog.Debug(LOG_DATABASE, "Database worker working...");
op->SetConnection(_conn);
op->Execute();
delete op;
sLog.Debug(LOG_DATABASE, "Database worker finished!");
}
}
}

22
src/server/shared/MySQL/DatabaseWorker.h

@ -0,0 +1,22 @@ @@ -0,0 +1,22 @@
#ifndef DATABASE_WORKER_MYSQL_H_
#define DATABASE_WORKER_MYSQL_H_
#include "Util.h"
#include "DatabaseOperation.h"
namespace MySQL
{
class DatabaseConnection;
class DatabaseWorker : public Util::GenericWorker
{
public:
DatabaseWorker(DatabaseWorkQueue* asyncQueue, DatabaseConnection* conn);
void Work();
private:
DatabaseWorkQueue* _asyncQueue;
DatabaseConnection* _conn;
};
}
#endif

80
src/server/shared/MySQL/DatabaseWorkerPool.cpp

@ -0,0 +1,80 @@ @@ -0,0 +1,80 @@
#include "DatabaseWorkerPool.h"
namespace MySQL
{
bool DatabaseWorkerPool::Open(ConnectionInfo connInfo, uint8_t syncThreads, uint8_t asyncThreads)
{
bool res = true;
_connectionInfo = connInfo;
sLog.Info(LOG_DATABASE, "Opening MySQL Database Pool '%s'. Asynchronous threads: %u, synchronous threads: %u.", _connectionInfo.DB.c_str(), asyncThreads, syncThreads);
for (uint8_t i = 0; i < syncThreads; ++i)
{
DatabaseConnection* conn = new DatabaseConnection(_connectionInfo);
res &= conn->Open();
_connections[MYSQL_CONN_SYNC].push_back(conn);
}
for (uint8_t i = 0; i < syncThreads; ++i)
{
DatabaseConnection* conn = new DatabaseConnection(_connectionInfo, _asyncQueue);
res &= conn->Open();
_connections[MYSQL_CONN_ASYNC].push_back(conn);
}
if (res)
sLog.Info(LOG_DATABASE, "MySQL Database Pool '%s' opened successfully. %u total connections.", _connectionInfo.DB.c_str(), _connections[MYSQL_CONN_SYNC].size()+_connections[MYSQL_CONN_ASYNC].size());
else
sLog.Error(LOG_DATABASE, "Failed opening MySQL Database Pool to '%s'.", _connectionInfo.DB.c_str());
return res;
}
void DatabaseWorkerPool::Close()
{
sLog.Info(LOG_DATABASE, "Closing MySQL Database Pool '%s'.", _connectionInfo.DB.c_str());
// Stop worker threads
_asyncQueue->Stop();
for (uint8_t i = 0; i < _connections[MYSQL_CONN_SYNC].size(); ++i)
{
DatabaseConnection* conn = _connections[MYSQL_CONN_SYNC][i];
conn->Close();
}
for (uint8_t i = 0; i < _connections[MYSQL_CONN_ASYNC].size(); ++i)
{
DatabaseConnection* conn = _connections[MYSQL_CONN_ASYNC][i];
conn->Close();
}
sLog.Info(LOG_DATABASE, "Closed all connections to MySQL Database Pool '%s'.", _connectionInfo.DB.c_str());
}
bool DatabaseWorkerPool::PrepareStatement(index, const char* sql, PreparedStatementFlags flags)
{
if (flags & STMT_SYNC) {
for (uint8_t i = 0; i < _connections[MYSQL_CONN_SYNC].size(); ++i) {
DatabaseConnection* conn = _connections[MYSQL_CONN_SYNC][i];
if (!conn->PrepareStatement(index, sql)) {
sLog.Error(LOG_DATABASE, "Failed to prepare statement");
return false;
}
}
}
if (flags & STMT_ASYNC) {
for (uint8_t i = 0; i < _connections[MYSQL_CONN_ASYNC].size(); ++i) {
DatabaseConnection* conn = _connections[MYSQL_CONN_ASYNC][i];
if (!conn->PrepareStatement(index, sql)) {
sLog.Error(LOG_DATABASE, "Failed to prepare statement");
return false;
}
}
}
return true;
}
}

116
src/server/shared/MySQL/DatabaseWorkerPool.h

@ -0,0 +1,116 @@ @@ -0,0 +1,116 @@
#ifndef DATABASE_WORKER_POOL_MYSQL_H_
#define DATABASE_WORKER_POOL_MYSQL_H_
#include "DatabaseConnection.h"
#include "PreparedStatement.h"
#include "QueryResult.h"
#include "Log.h"
#include <vector>
namespace MySQL
{
class DatabaseWorkerPool
{
public:
DatabaseWorkerPool() : _asyncQueue(new DatabaseWorkQueue()) { }
~DatabaseWorkerPool()
{
delete _asyncQueue;
}
bool Open(ConnectionInfo connInfo, uint8_t syncThreads, uint8_t asyncThreads);
void Close();
// Queries
bool Execute(const char* query)
{
DatabaseConnection* conn = GetSyncConnection();
bool result = conn->Execute(query);
conn->Unlock();
return result;
}
QueryResult Query(const char* query)
{
DatabaseConnection* conn = GetSyncConnection();
ResultSet* result = conn->Query(query);
conn->Unlock();
return QueryResult(result);
}
// Stmt
bool Execute(PreparedStatement* stmt)
{
DatabaseConnection* conn = GetSyncConnection();
bool result = conn->Execute(stmt);
conn->Unlock();
return result;
}
QueryResult Query(PreparedStatement* stmt)
{
DatabaseConnection* conn = GetSyncConnection();
ResultSet* result = conn->Query(stmt);
conn->Unlock();
return QueryResult(result);
}
// Async
bool ExecuteAsync(const char* query)
{
DatabaseQueryOperation* op = new DatabaseQueryOperation(query);
_asyncQueue->Enqueue(op);
return true;
}
bool ExecuteAsync(PreparedStatement* stmt)
{
DatabasePreparedStatementOperation* op = new DatabasePreparedStatementOperation(stmt);
_asyncQueue->Enqueue(op);
return true;
}
bool QueryAsync(const char* query, DatabaseCallback callback)
{
DatabaseQueryOperation* op = new DatabaseQueryOperation(query, callback);
_asyncQueue->Enqueue(op);
return true;
}
bool QueryAsync(PreparedStatement* stmt, DatabaseCallback callback )
{
DatabasePreparedStatementOperation* op = new DatabasePreparedStatementOperation(stmt, callback);
_asyncQueue->Enqueue(op);
return true;
}
bool PrepareStatement(index, const char* sql, PreparedStatementFlags flags);
// Prepared Statements
PreparedStatement* GetPreparedStatement(uint32_t stmtid)
{
return NULL;//new PreparedStatement(stmtid);
}
private:
DatabaseConnection* GetSyncConnection()
{
uint32_t i;
uint8_t conn_size = _connections[MYSQL_CONN_SYNC].size();
DatabaseConnection* conn = NULL;
// Block until we find a free connection
for (;;)
{
conn = _connections[MYSQL_CONN_SYNC][++i % conn_size];
if (conn->LockIfReady())
break;
}
return conn;
}
std::vector<DatabaseConnection*> _connections[MYSQL_CONN_SIZE];
ConnectionInfo _connectionInfo;
DatabaseWorkQueue* _asyncQueue;
};
}
#endif

67
src/server/shared/MySQL/Field.h

@ -0,0 +1,67 @@ @@ -0,0 +1,67 @@
#ifndef FIELD_MYSQL_H_
#define FIELD_MYSQL_H_
#include <mysql.h>
#include <boost/lexical_cast.hpp>
namespace MySQL
{
class Field
{
public:
Field()
{
data.value = NULL;
data.type = MYSQL_TYPE_NULL;
data.length = 0;
}
void SetValue(char* value, enum_field_types type)
{
if (data.value)
CleanUp();
// This value stores somewhat structured data that needs function style casting
if (value)
{
size_t size = strlen(value);
data.value = new char [size+1];
strcpy((char*)data.value, value);
data.length = size;
}
data.type = type;
}
uint32_t GetUInt32()
{
return boost::lexical_cast<uint32_t>(data.value);
}
std::string GetString()
{
return boost::lexical_cast<std::string>(data.value);
}
double GetDouble()
{
return boost::lexical_cast<double>(data.value);
}
private:
struct
{
uint32_t length;
char* value;
enum_field_types type;
} data;
void CleanUp()
{
delete[] data.value;
data.value = NULL;
}
};
}
#endif

37
src/server/shared/MySQL/PreparedStatement.cpp

@ -0,0 +1,37 @@ @@ -0,0 +1,37 @@
#include "PreparedStatement.h"
#include <cstring>
namespace MySQL
{
PreparedStatement::PreparedStatement(MYSQL_STMT* stmt) :
_stmt(stmt), _bind(NULL)
{
_paramCount = mysql_stmt_param_count(stmt);
_bind = new MYSQL_BIND[_paramCount];
memset(_bind, 0, sizeof(MYSQL_BIND)*_paramCount);
}
PreparedStatement::~PreparedStatement()
{
ClearParameters();
if (_stmt->bind_result_done)
{
delete[] _stmt->bind->length;
delete[] _stmt->bind->is_null;
}
mysql_stmt_close(_stmt);
delete[] _bind;
this->~PreparedStatement();
}
void PreparedStatement::ClearParameters()
{
for (uint8_t i = 0; i < _paramCount; ++i)
{
delete _bind[i].length;
_bind[i].length = NULL;
delete[] (char*) _bind[i].buffer;
_bind[i].buffer = NULL;
}
}
}

33
src/server/shared/MySQL/PreparedStatement.h

@ -0,0 +1,33 @@ @@ -0,0 +1,33 @@
#ifndef PREPARED_STATEMENT_MYSQL_H_
#define PREPARED_STATEMENT_MYSQL_H_
#include <boost/cstdint.hpp>
#include <mysql.h>
namespace MySQL
{
enum PreparedStatementFlags
{
STMT_SYNC,
STMT_ASYNC,
STMT_BOTH = STMT_SYNC | STMT_ASYNC
};
class PreparedStatement
{
public:
PreparedStatement(MYSQL_STMT* stmt);
~PreparedStatement();
template<class T>
void Set(const uint8_t index, const T value);
void ClearParameters();
private:
MYSQL_STMT* _stmt;
MYSQL_BIND* _bind;
uint8_t _paramCount;
};
}
#endif

53
src/server/shared/MySQL/QueryResult.cpp

@ -0,0 +1,53 @@ @@ -0,0 +1,53 @@
#include "QueryResult.h"
#include "Log.h"
namespace MySQL
{
ResultSet::ResultSet(MYSQL_RES* result, MYSQL_FIELD* fields, uint64_t rowCount, uint32_t fieldCount) :
_result(result), _fields(fields), _rowCount(rowCount), _fieldCount(fieldCount)
{
_currentRow = new Field[_fieldCount];
}
ResultSet::~ResultSet()
{
}
bool ResultSet::NextRow()
{
MYSQL_ROW row;
if (!_result) {
sLog.Debug(LOG_DATABASE, "QueryResultMySQL::NextRow(): Empty result");
return false;
}
row = mysql_fetch_row(_result);
if (!row)
{
sLog.Debug(LOG_DATABASE, "QueryResultMySQL::NextRow(): End of result");
CleanUp();
return false;
}
for (uint32_t i = 0; i < _fieldCount; i++)
_currentRow[i].SetValue(row[i], _fields[i].type);
return true;
}
void ResultSet::CleanUp()
{
if (_currentRow)
{
delete [] _currentRow;
_currentRow = NULL;
}
if (_result)
{
mysql_free_result(_result);
_result = NULL;
}
}
}

49
src/server/shared/MySQL/QueryResult.h

@ -0,0 +1,49 @@ @@ -0,0 +1,49 @@
#ifndef QUERY_RESULT_MYSQL_H_
#define QUERY_RESULT_MYSQL_H_
#include "Field.h"
#include <boost/shared_ptr.hpp>
#include <cassert>
#include <mysql.h>
namespace MySQL
{
class ResultSet
{
public:
ResultSet(MYSQL_RES* result, MYSQL_FIELD* fields, uint64_t rowCount, uint32_t fieldCount);
~ResultSet();
// Metadata
uint64_t GetRowCount()
{
return _rowCount;
}
uint32_t GetFieldCount()
{
return _fieldCount;
}
bool NextRow();
Field* Fetch()
{
return _currentRow;
}
private:
uint64_t _rowCount;
Field* _currentRow;
uint32_t _fieldCount;
void CleanUp();
MYSQL_RES* _result;
MYSQL_FIELD* _fields;
};
typedef boost::shared_ptr<ResultSet> QueryResult;
}
#endif
Loading…
Cancel
Save