您的位置:首页 > 汽车 > 新车 > 魔兽世界服务端TrinityCore连接池源码剖析

魔兽世界服务端TrinityCore连接池源码剖析

2025/1/26 14:32:17 来源:https://blog.csdn.net/weixin_50448879/article/details/141498548  浏览:    关键词:魔兽世界服务端TrinityCore连接池源码剖析

简介

  • 魔兽世界服务器中数据库使用mysql来存储,并且数据库模块是直接嵌入在serve中,并没有单独的DB server
  • 在魔兽连接池中有两种连接池,一种是同步连接池,还有异步连接池
  • 连接池相关源码目录 TrinityCore-master\src\server\database\Database
  • 连接池具体文件:DatabaseWorkerPool.h DatabaseWorkerPool.cpp

在连接池源码中涉及到了一些其他类,读者如有不懂,我已经列出清单,可以自行查看相关源码实现

TnWKluwRIc4zsfE

核心源码讲解

DatabaseWorkerPool是一个模板类,提供了四个类,分别代表魔兽世界四个数据库的连接池

/// Accessor to the world database
TC_DATABASE_API extern DatabaseWorkerPool<WorldDatabaseConnection> WorldDatabase;
/// Accessor to the character database
TC_DATABASE_API extern DatabaseWorkerPool<CharacterDatabaseConnection> CharacterDatabase;
/// Accessor to the realm/login database
TC_DATABASE_API extern DatabaseWorkerPool<LoginDatabaseConnection> LoginDatabase;
/// Accessor to the hotfix database
TC_DATABASE_API extern DatabaseWorkerPool<HotfixDatabaseConnection> HotfixDatabase;

数据库类型

  • IDX_ASYNC: 异步连接池类型
  • IDX_SYNCH: 同步连接池类型

IDX_SIZE主要用于元素数量使用

enum InternalIndex{// 异步 0IDX_ASYNC,// 同步 1IDX_SYNCH,// 2 利用enum元素下标从零开始的性质,可以当作枚举长度使用IDX_SIZE};

核心成员

  • _queue: 线程池任务队列

  • _connections:连接池(同步连接池和异步连接池用这一个),异步连接池下标为0,同步连接池下标为1

    • EFY76IoJjPlHayx
  • _connectionInfo: 存储数据库信息

    •   struct TC_DATABASE_API MySQLConnectionInfo{explicit MySQLConnectionInfo(std::string const& infoString);// 用户名std::string user;// 密码std::string password;// 数据库名std::string database;// 主机地址std::string host;// 端口std::string port_or_socket;std::string ssl;};
      
  • _preparedStatementSize:预加载sql语句数量,与业务有关,实现存储的sql语句

  • _async_threads:异步线程池的线程数量

  • _synch_threads:同步线程池的线程数量

        // SQL任务队列,生产者消费者模型std::unique_ptr<ProducerConsumerQueue<SQLOperation*>> _queue;// 连接池(同步连接池和异步连接池用这一个)std::array<std::vector<std::unique_ptr<T>>, IDX_SIZE> _connections;// 存储数据库信息std::unique_ptr<MySQLConnectionInfo> _connectionInfo;// 预加载sql语句数量std::vector<uint8> _preparedStatementSize;// 异步线程数量,同步线程数量uint8 _async_threads, _synch_threads;

核心源码讲解

初始化构造函数

创建连接池对象以及清零线程池数量

template <class T>
DatabaseWorkerPool<T>::DatabaseWorkerPool(): _queue(new ProducerConsumerQueue<SQLOperation*>()),_async_threads(0), _synch_threads(0)
{
}

析构函数释放资源

将连接池中所有mysql连接对象释放

template <class T>
DatabaseWorkerPool<T>::~DatabaseWorkerPool()
{_queue->Cancel();
}

SetConnectionInfo 设置数据库信息以及连接池数量

template <class T>
void DatabaseWorkerPool<T>::SetConnectionInfo(std::string const& infoString,uint8 const asyncThreads, uint8 const synchThreads)
{_connectionInfo = std::make_unique<MySQLConnectionInfo>(infoString);_async_threads = asyncThreads;_synch_threads = synchThreads;
}

OpenConnections创建一条指定的连接

// brief:通过type指定的连接池类型,生成numConnections数量条mysql连接加入连接池
template <class T>
uint32 DatabaseWorkerPool<T>::OpenConnections(InternalIndex type, uint8 numConnections)
{for (uint8 i = 0; i < numConnections; ++i){// 创建一条MySQLConnection连接了类auto connection = [&] {switch (type){case IDX_ASYNC:return std::make_unique<T>(_queue.get(), *_connectionInfo);case IDX_SYNCH:return std::make_unique<T>(*_connectionInfo);default:ABORT();}}();// 与mysql进行实际的连接if (uint32 error = connection->Open()){// Failed to open a connection or invalid version, abort and cleanup_connections[type].clear();return error;}
// 对mysql版本进行判断,不能低于最低要求版本
#ifndef LIBMARIADBelse if (connection->GetServerVersion() < MIN_MYSQL_SERVER_VERSION)
#elseelse if (connection->GetServerVersion() < MIN_MARIADB_SERVER_VERSION)
#endif{
#ifndef LIBMARIADBTC_LOG_ERROR("sql.driver", "TrinityCore does not support MySQL versions below " MIN_MYSQL_SERVER_VERSION_STRING " (found id {}, need id >= {}), please update your MySQL server", connection->GetServerVersion(), MIN_MYSQL_SERVER_VERSION);
#elseTC_LOG_ERROR("sql.driver", "TrinityCore does not support MariaDB versions below " MIN_MARIADB_SERVER_VERSION_STRING " (found id {}, need id >= {}), please update your MySQL server", connection->GetServerVersion(), MIN_MARIADB_SERVER_VERSION);
#endifreturn 1;}else{// 将生成的mysql连接加入对应的mysql连接池_connections[type].push_back(std::move(connection));}}// Everything is finereturn 0;
}

Open 初始化连接池

初始化连接池,生成指定数量的同步和异步mysql连接加入连接池

template <class T>
uint32 DatabaseWorkerPool<T>::Open()
{WPFatal(_connectionInfo.get(), "Connection info was not set!");TC_LOG_INFO("sql.driver", "Opening DatabasePool '{}'. ""Asynchronous connections: {}, synchronous connections: {}.",GetDatabaseName(), _async_threads, _synch_threads);// 在异步连接池中添加_async_threads数量的异步连接uint32 error = OpenConnections(IDX_ASYNC, _async_threads);if (error)return error;// 在同步连接池中添加_sync_threads数量的同步连接error = OpenConnections(IDX_SYNCH, _synch_threads);if (!error){TC_LOG_INFO("sql.driver", "DatabasePool '{}' opened successfully. ""{} total connections running.", GetDatabaseName(),(_connections[IDX_SYNCH].size() + _connections[IDX_ASYNC].size()));}return error;
}

close 关闭连接

关闭所有的同步连接以及异步连接

template <class T>
void DatabaseWorkerPool<T>::Close()
{TC_LOG_INFO("sql.driver", "Closing down DatabasePool '{}'.", GetDatabaseName());//! Closes the actualy MySQL connection.// 关闭所有的异步连接_connections[IDX_ASYNC].clear();TC_LOG_INFO("sql.driver", "Asynchronous connections on DatabasePool '{}' terminated. ""Proceeding with synchronous connections.",GetDatabaseName());//! Shut down the synchronous connections//! There's no need for locking the connection, because DatabaseWorkerPool<>::Close//! should only be called after any other thread tasks in the core have exited,//! meaning there can be no concurrent access at this point.// 关闭所有的同步连接_connections[IDX_SYNCH].clear();TC_LOG_INFO("sql.driver", "All connections on DatabasePool '{}' closed.", GetDatabaseName());
}

GetFreeConnection获取一条空闲连接

template <class T>
T* DatabaseWorkerPool<T>::GetFreeConnection()
{
#ifdef TRINITY_DEBUGif (_warnSyncQueries){std::ostringstream ss;ss << boost::stacktrace::stacktrace();TC_LOG_WARN("sql.performances", "Sync query at:\n{}", ss.str());}
#endifuint8 i = 0;auto const num_cons = _connections[IDX_SYNCH].size();T* connection = nullptr;//! Block forever until a connection is free// 循环遍历,获取一条空闲连接返回for (;;){connection = _connections[IDX_SYNCH][i++ % num_cons].get();//! Must be matched with t->Unlock() or you will get deadlocksif (connection->LockIfReady())break;}return connection;
}

完整源码

DatabaseWorkerPool.h

#ifndef _DATABASEWORKERPOOL_H
#define _DATABASEWORKERPOOL_H#include "Define.h"
#include "DatabaseEnvFwd.h"
#include "StringFormat.h"
#include <array>
#include <string>
#include <vector>template <typename T>
class ProducerConsumerQueue;class SQLOperation;
struct MySQLConnectionInfo;template <class T>
class DatabaseWorkerPool
{private:enum InternalIndex{// 异步 0IDX_ASYNC,// 同步 1IDX_SYNCH,// 利用enum元素下标从零开始的性质,可以当作枚举长度使用IDX_SIZE};public:/* Activity state */DatabaseWorkerPool();~DatabaseWorkerPool();// 设置数据库信息以及异步/同步连接池线程数量大小void SetConnectionInfo(std::string const& infoString, uint8 const asyncThreads, uint8 const synchThreads);// 初始化连接池,生成指定数量的同步和异步mysql连接加入连接池uint32 Open();// 将同步连接池与异步连接池的mysql连接进行销毁void Close();//! Prepares all prepared statementsbool PrepareStatements();inline MySQLConnectionInfo const* GetConnectionInfo() const{return _connectionInfo.get();}/**延迟单向语句方法。*/// 以字符串格式对将异步执行的单向SQL操作进行排队。// 这个方法应该只用于只执行一次的查询,例如在启动期间。// 将sql语句加入任务队列void Execute(char const* sql);// !以字符串格式(带有可变参数)对将异步执行的单向SQL操作进行排队。// !这个方法应该只用于只执行一次的查询,例如在启动期间。template<typename... Args>void PExecute(Trinity::FormatString<Args...> sql, Args&&... args){if (Trinity::IsFormatEmptyOrNull(sql))return;Execute(Trinity::StringFormat(sql, std::forward<Args>(args)...).c_str());}// !以准备好的语句格式对将异步执行的单向SQL操作进行排队。// !语句必须使用CONNECTION_ASYNC标志来准备。void Execute(PreparedStatement<T>* stmt);/**直接同步单向语句方法。*/// !直接以字符串格式执行单向SQL操作,这将阻塞调用线程,直到完成。// !这个方法应该只用于只执行一次的查询,例如在启动期间。void DirectExecute(char const* sql);// !直接以字符串格式(带有可变参数)执行单向SQL操作,这将阻塞调用线程,直到完成。// !这个方法应该只用于只执行一次的查询,例如在启动期间。template<typename... Args>void DirectPExecute(Trinity::FormatString<Args...> sql, Args&&... args){if (Trinity::IsFormatEmptyOrNull(sql))return;DirectExecute(Trinity::StringFormat(sql, std::forward<Args>(args)...).c_str());}// !直接以准备好的语句格式执行单向SQL操作,这将阻塞调用线程,直到完成。// !语句必须使用connection_sync标志来准备。void DirectExecute(PreparedStatement<T>* stmt);/**同步查询(带结果集)方法。*/// !直接以字符串格式执行SQL查询,该查询将阻塞调用线程,直到完成。// 返回引用计数的自动指针,不需要在上层代码中手动内存管理。QueryResult Query(char const* sql, T* connection = nullptr);// !直接以字符串格式执行SQL查询(带有可变参数),这将阻塞调用线程,直到完成。// !返回引用计数的自动指针,不需要在上层代码中手动内存管理。template<typename... Args>QueryResult PQuery(Trinity::FormatString<Args...> sql, T* conn, Args&&... args){if (Trinity::IsFormatEmptyOrNull(sql))return QueryResult(nullptr);return Query(Trinity::StringFormat(sql, std::forward<Args>(args)...).c_str(), conn);}// !直接以字符串格式执行SQL查询(带有可变参数),这将阻塞调用线程,直到完成。// !返回引用计数的自动指针,不需要在上层代码中手动内存管理。template<typename... Args>QueryResult PQuery(Trinity::FormatString<Args...> sql, Args&&... args){if (Trinity::IsFormatEmptyOrNull(sql))return QueryResult(nullptr);return Query(Trinity::StringFormat(sql, std::forward<Args>(args)...).c_str());}// !直接以准备好的格式执行SQL查询,该查询将阻塞调用线程,直到完成。// !返回引用计数的自动指针,不需要在上层代码中手动内存管理。// !语句必须带有CONNECTION_SYNCH标志。PreparedQueryResult Query(PreparedStatement<T>* stmt);/**异步查询(带结果集)方法。*/// !以字符串格式对查询进行排队,该查询将在查询执行后立即设置QueryResultFuture返回对象的值。// !然后在ProcessQueryCallback方法中处理返回值。QueryCallback AsyncQuery(char const* sql);// !以准备好的格式对查询进行队列,该格式将在查询执行后立即设置PreparedQueryResultFuture返回对象的值。// !然后在ProcessQueryCallback方法中处理返回值。// !语句必须使用CONNECTION_ASYNC标志来准备。QueryCallback AsyncQuery(PreparedStatement<T>* stmt);// !将设置QueryResultHolderFuture值的SQL操作向量(可以是临时的,也可以是准备好的)排队// !查询执行后立即返回对象。// !然后在ProcessQueryCallback方法中处理返回值。// !添加到这个holder中的任何预处理语句都需要使用CONNECTION_ASYNC标志进行预处理。SQLQueryHolderCallback DelayQueryHolder(std::shared_ptr<SQLQueryHolder<T>> holder);/**事务上下文方法。*/// !开始一个自动托管事务指针,如果未提交,该指针将自动回滚。(自动提交= 0)SQLTransaction<T> BeginTransaction();// !对一组单向SQL操作(可以是临时的,也可以是准备好的)进行排队。这些操作的顺序// !在执行过程中,附加到事务中的内容将受到尊重。void CommitTransaction(SQLTransaction<T> transaction);//! 对一组单向SQL操作(可以是临时的,也可以是准备好的)进行排队。这些操作的顺序// !在执行过程中,附加到事务中的内容将受到尊重。TransactionCallback AsyncCommitTransaction(SQLTransaction<T> transaction);// !直接执行一组单向SQL操作(可以是临时的,也可以是准备好的)。这些操作的顺序// !在执行过程中,附加到事务中的内容将受到尊重。void DirectCommitTransaction(SQLTransaction<T>& transaction);// !方法,用于在不同上下文中执行特别语句。// !如果存在有效对象,将封装在事务中,否则单独执行。void ExecuteOrAppend(SQLTransaction<T>& trans, char const* sql);// !方法,用于在不同上下文中执行准备好的语句。// !如果存在有效对象,将封装在事务中,否则单独执行。void ExecuteOrAppend(SQLTransaction<T>& trans, PreparedStatement<T>* stmt);/**Other*/typedef typename T::Statements PreparedStatementIndex;// !自动管理(内部)指针,指向一个准备好的语句对象,供上层代码使用。// !在this->DirectExecute(PreparedStatement*)、this->Query(PreparedStatement*)或PreparedStatementTask::~PreparedStatementTask中删除指针。// !在执行之前,这个对象不会绑定到MySQL上下文中准备好的语句。PreparedStatement<T>* GetPreparedStatement(PreparedStatementIndex index);// !为当前排序应用转义字符串。(use utf8)// 字符串转义void EscapeString(std::string& str);//!保持我们所有的MySQL连接存活,防止服务器断开我们的连接。void KeepAlive();void WarnAboutSyncQueries([[maybe_unused]] bool warn){
#ifdef TRINITY_DEBUG_warnSyncQueries = warn;
#endif}size_t QueueSize() const;private:// 通过type指定的连接池类型,生成numConnections数量条mysql连接加入连接池uint32 OpenConnections(InternalIndex type, uint8 numConnections);unsigned long EscapeString(char* to, char const* from, unsigned long length);// 将任务加入任务队列void Enqueue(SQLOperation* op);// 获取同步连接池中的空闲连接。// 调用者必须在触及MySQL上下文后调用t->Unlock()以防止死锁。T* GetFreeConnection();// 获取数据库名char const* GetDatabaseName() const;// SQL任务队列,生产者消费者模型std::unique_ptr<ProducerConsumerQueue<SQLOperation*>> _queue;// 连接池(同步连接池和异步连接池用这一个)std::array<std::vector<std::unique_ptr<T>>, IDX_SIZE> _connections;// 存储数据库信息std::unique_ptr<MySQLConnectionInfo> _connectionInfo;// 预加载sql语句数量std::vector<uint8> _preparedStatementSize;// 异步线程数量,同步线程数量uint8 _async_threads, _synch_threads;
#ifdef TRINITY_DEBUGstatic inline thread_local bool _warnSyncQueries = false;
#endif
};#endif

DatabaseWorkerPool.cpp

#include "DatabaseWorkerPool.h"
#include "AdhocStatement.h"
#include "Common.h"
#include "Errors.h"
#include "Implementation/LoginDatabase.h"
#include "Implementation/WorldDatabase.h"
#include "Implementation/CharacterDatabase.h"
#include "Implementation/HotfixDatabase.h"
#include "Log.h"
#include "MySQLPreparedStatement.h"
#include "PreparedStatement.h"
#include "ProducerConsumerQueue.h"
#include "QueryCallback.h"
#include "QueryHolder.h"
#include "QueryResult.h"
#include "SQLOperation.h"
#include "Transaction.h"
#include "MySQLWorkaround.h"
#include <mysqld_error.h>
#ifdef TRINITY_DEBUG
#include <sstream>
#include <boost/stacktrace.hpp>
#endif#define MIN_MYSQL_SERVER_VERSION 50700u
#define MIN_MYSQL_SERVER_VERSION_STRING "5.7"
#define MIN_MYSQL_CLIENT_VERSION 50700u
#define MIN_MYSQL_CLIENT_VERSION_STRING "5.7"#define MIN_MARIADB_SERVER_VERSION 100209u
#define MIN_MARIADB_SERVER_VERSION_STRING "10.2.9"
#define MIN_MARIADB_CLIENT_VERSION 30003u
#define MIN_MARIADB_CLIENT_VERSION_STRING "3.0.3"class PingOperation : public SQLOperation
{//! Operation for idle delaythreads// 测试mysql连接是否可以ping通bool Execute() override{m_conn->Ping();return true;}
};template <class T>
DatabaseWorkerPool<T>::DatabaseWorkerPool(): _queue(new ProducerConsumerQueue<SQLOperation*>()),_async_threads(0), _synch_threads(0)
{WPFatal(mysql_thread_safe(), "Used MySQL library isn't thread-safe.");#if defined(LIBMARIADB) && MARIADB_PACKAGE_VERSION_ID >= 30200WPFatal(mysql_get_client_version() >= MIN_MARIADB_CLIENT_VERSION, "TrinityCore does not support MariaDB versions below " MIN_MARIADB_CLIENT_VERSION_STRING " (found %s id %lu, need id >= %u), please update your MariaDB client library", mysql_get_client_info(), mysql_get_client_version(), MIN_MARIADB_CLIENT_VERSION);WPFatal(mysql_get_client_version() == MARIADB_PACKAGE_VERSION_ID, "Used MariaDB library version (%s id %lu) does not match the version id used to compile TrinityCore (id %u). Search on forum for TCE00011.", mysql_get_client_info(), mysql_get_client_version(), MARIADB_PACKAGE_VERSION_ID);
#elseWPFatal(mysql_get_client_version() >= MIN_MYSQL_CLIENT_VERSION, "TrinityCore does not support MySQL versions below " MIN_MYSQL_CLIENT_VERSION_STRING " (found %s id %lu, need id >= %u), please update your MySQL client library", mysql_get_client_info(), mysql_get_client_version(), MIN_MYSQL_CLIENT_VERSION);WPFatal(mysql_get_client_version() == MYSQL_VERSION_ID, "Used MySQL library version (%s id %lu) does not match the version id used to compile TrinityCore (id %u). Search on forum for TCE00011.", mysql_get_client_info(), mysql_get_client_version(), MYSQL_VERSION_ID);
#endif
}template <class T>
DatabaseWorkerPool<T>::~DatabaseWorkerPool()
{_queue->Cancel();
}template <class T>
void DatabaseWorkerPool<T>::SetConnectionInfo(std::string const& infoString,uint8 const asyncThreads, uint8 const synchThreads)
{_connectionInfo = std::make_unique<MySQLConnectionInfo>(infoString);_async_threads = asyncThreads;_synch_threads = synchThreads;
}template <class T>
uint32 DatabaseWorkerPool<T>::Open()
{WPFatal(_connectionInfo.get(), "Connection info was not set!");TC_LOG_INFO("sql.driver", "Opening DatabasePool '{}'. ""Asynchronous connections: {}, synchronous connections: {}.",GetDatabaseName(), _async_threads, _synch_threads);// 在异步连接池中添加_async_threads数量的异步连接uint32 error = OpenConnections(IDX_ASYNC, _async_threads);if (error)return error;// 在同步连接池中添加_sync_threads数量的同步连接error = OpenConnections(IDX_SYNCH, _synch_threads);if (!error){TC_LOG_INFO("sql.driver", "DatabasePool '{}' opened successfully. ""{} total connections running.", GetDatabaseName(),(_connections[IDX_SYNCH].size() + _connections[IDX_ASYNC].size()));}return error;
}template <class T>
void DatabaseWorkerPool<T>::Close()
{TC_LOG_INFO("sql.driver", "Closing down DatabasePool '{}'.", GetDatabaseName());//! Closes the actualy MySQL connection.// 关闭所有的异步连接_connections[IDX_ASYNC].clear();TC_LOG_INFO("sql.driver", "Asynchronous connections on DatabasePool '{}' terminated. ""Proceeding with synchronous connections.",GetDatabaseName());//! Shut down the synchronous connections//! There's no need for locking the connection, because DatabaseWorkerPool<>::Close//! should only be called after any other thread tasks in the core have exited,//! meaning there can be no concurrent access at this point.// 关闭所有的同步连接_connections[IDX_SYNCH].clear();TC_LOG_INFO("sql.driver", "All connections on DatabasePool '{}' closed.", GetDatabaseName());
}template <class T>
bool DatabaseWorkerPool<T>::PrepareStatements()
{// connections == std::vector<std::unique_ptr<T>>// 先处理异步连接池,在处理同步连接池for (auto& connections : _connections){// connection == std::unique_ptr<T> == 一条mysql连接抽象类for (auto& connection : connections){// 对mysql连接抽象类上锁,是否允许其他线程访问此连接connection->LockIfReady();// 执行mysql连接具体派生类(具体数据库)的预加载sql语句(业务相关)// 具体可查看数据库派生类的DoPrepareStatements函数if (!connection->PrepareStatements()){connection->Unlock();Close();return false;}elseconnection->Unlock();// 获取预加载sql的数量size_t const preparedSize = connection->m_stmts.size();if (_preparedStatementSize.size() < preparedSize)_preparedStatementSize.resize(preparedSize);// 初始化预加载sqlfor (size_t i = 0; i < preparedSize; ++i){// already set by another connection// (each connection only has prepared statements of it's own type sync/async)if (_preparedStatementSize[i] > 0)continue;if (MySQLPreparedStatement * stmt = connection->m_stmts[i].get()){uint32 const paramCount = stmt->GetParameterCount();// TC only supports uint8 indices.ASSERT(paramCount < std::numeric_limits<uint8>::max());_preparedStatementSize[i] = static_cast<uint8>(paramCount);}}}}return true;
}template <class T>
QueryResult DatabaseWorkerPool<T>::Query(char const* sql, T* connection /*= nullptr*/)
{if (!connection)connection = GetFreeConnection();ResultSet* result = connection->Query(sql);connection->Unlock();if (!result || !result->GetRowCount() || !result->NextRow()){delete result;return QueryResult(nullptr);}return QueryResult(result);
}template <class T>
PreparedQueryResult DatabaseWorkerPool<T>::Query(PreparedStatement<T>* stmt)
{// 获取一条空闲连接auto connection = GetFreeConnection();// 执行stmt指定的预加载sqlPreparedResultSet* ret = connection->Query(stmt);connection->Unlock();//! Delete proxy-class. Not needed anymoredelete stmt;if (!ret || !ret->GetRowCount()){delete ret;return PreparedQueryResult(nullptr);}// 将结果返回return PreparedQueryResult(ret);
}template <class T>
QueryCallback DatabaseWorkerPool<T>::AsyncQuery(char const* sql)
{// BasicStatementTask:内部包装了std::future以及存储sql语句BasicStatementTask* task = new BasicStatementTask(sql, true);// 在进入队列之前存储未来的结果-任务可能在从此方法返回之前已经被处理和删除QueryResultFuture result = task->GetFuture();// 将任务加入生产者消费者模型队列中Enqueue(task);// 异步返回结果return QueryCallback(std::move(result));
}template <class T>
QueryCallback DatabaseWorkerPool<T>::AsyncQuery(PreparedStatement<T>* stmt)
{PreparedStatementTask* task = new PreparedStatementTask(stmt, true);// Store future result before enqueueing - task might get already processed and deleted before returning from this methodPreparedQueryResultFuture result = task->GetFuture();Enqueue(task);return QueryCallback(std::move(result));
}template <class T>
SQLQueryHolderCallback DatabaseWorkerPool<T>::DelayQueryHolder(std::shared_ptr<SQLQueryHolder<T>> holder)
{SQLQueryHolderTask* task = new SQLQueryHolderTask(holder);// Store future result before enqueueing - task might get already processed and deleted before returning from this methodQueryResultHolderFuture result = task->GetFuture();Enqueue(task);return { std::move(holder), std::move(result) };
}template <class T>
SQLTransaction<T> DatabaseWorkerPool<T>::BeginTransaction()
{return std::make_shared<Transaction<T>>();
}template <class T>
void DatabaseWorkerPool<T>::CommitTransaction(SQLTransaction<T> transaction)
{
#ifdef TRINITY_DEBUG// !只在调试模式下分析事务弱点。// !理想情况下,我们在调试模式下捕获错误,然后纠正它们,// !所以没有必要在Release模式下浪费这些CPU周期。switch (transaction->GetSize()){case 0:TC_LOG_DEBUG("sql.driver", "Transaction contains 0 queries. Not executing.");return;case 1:TC_LOG_DEBUG("sql.driver", "Warning: Transaction only holds 1 query, consider removing Transaction context in code.");break;default:break;}
#endif // TRINITY_DEBUGEnqueue(new TransactionTask(transaction));
}// 异步提交事务
template <class T>
TransactionCallback DatabaseWorkerPool<T>::AsyncCommitTransaction(SQLTransaction<T> transaction)
{
#ifdef TRINITY_DEBUG//! Only analyze transaction weaknesses in Debug mode.//! Ideally we catch the faults in Debug mode and then correct them,//! so there's no need to waste these CPU cycles in Release mode.switch (transaction->GetSize()){case 0:TC_LOG_DEBUG("sql.driver", "Transaction contains 0 queries. Not executing.");break;case 1:TC_LOG_DEBUG("sql.driver", "Warning: Transaction only holds 1 query, consider removing Transaction context in code.");break;default:break;}
#endif // TRINITY_DEBUGTransactionWithResultTask* task = new TransactionWithResultTask(transaction);TransactionFuture result = task->GetFuture();Enqueue(task);return TransactionCallback(std::move(result));
}// 同步提交事务
template <class T>
void DatabaseWorkerPool<T>::DirectCommitTransaction(SQLTransaction<T>& transaction)
{T* connection = GetFreeConnection();int errorCode = connection->ExecuteTransaction(transaction);if (!errorCode){connection->Unlock();      // OK, operation succesfulreturn;}//! Handle MySQL Errno 1213 without extending deadlock to the core itself/// @todo More elegant wayif (errorCode == ER_LOCK_DEADLOCK){//todo: handle multiple sync threads deadlocking in a similar way as async threadsuint8 loopBreaker = 5;for (uint8 i = 0; i < loopBreaker; ++i){if (!connection->ExecuteTransaction(transaction))break;}}//! Clean up now.transaction->Cleanup();connection->Unlock();
}template <class T>
PreparedStatement<T>* DatabaseWorkerPool<T>::GetPreparedStatement(PreparedStatementIndex index)
{return new PreparedStatement<T>(index, _preparedStatementSize[index]);
}// 字符串转义
template <class T>
void DatabaseWorkerPool<T>::EscapeString(std::string& str)
{if (str.empty())return;char* buf = new char[str.size() * 2 + 1];EscapeString(buf, str.c_str(), uint32(str.size()));str = buf;delete[] buf;
}template <class T>
void DatabaseWorkerPool<T>::KeepAlive()
{//! Ping同步连接for (auto& connection : _connections[IDX_SYNCH]){if (connection->LockIfReady()){connection->Ping();connection->Unlock();}}// !假设所有工作线程都是空闲的,每个工作线程将接收1个ping操作请求// !如果一个或多个工作线程很忙,ping操作将不会平均分配,但这无关紧要// !因为唯一的目的是防止连接空转。auto const count = _connections[IDX_ASYNC].size();for (uint8 i = 0; i < count; ++i)Enqueue(new PingOperation);
}// brief:通过type指定的连接池类型,生成numConnections数量条mysql连接加入连接池
template <class T>
uint32 DatabaseWorkerPool<T>::OpenConnections(InternalIndex type, uint8 numConnections)
{for (uint8 i = 0; i < numConnections; ++i){// 创建一条MySQLConnection连接了类auto connection = [&] {switch (type){case IDX_ASYNC:return std::make_unique<T>(_queue.get(), *_connectionInfo);case IDX_SYNCH:return std::make_unique<T>(*_connectionInfo);default:ABORT();}}();// 与mysql进行实际的连接if (uint32 error = connection->Open()){// Failed to open a connection or invalid version, abort and cleanup_connections[type].clear();return error;}
// 对mysql版本进行判断,不能低于最低要求版本
#ifndef LIBMARIADBelse if (connection->GetServerVersion() < MIN_MYSQL_SERVER_VERSION)
#elseelse if (connection->GetServerVersion() < MIN_MARIADB_SERVER_VERSION)
#endif{
#ifndef LIBMARIADBTC_LOG_ERROR("sql.driver", "TrinityCore does not support MySQL versions below " MIN_MYSQL_SERVER_VERSION_STRING " (found id {}, need id >= {}), please update your MySQL server", connection->GetServerVersion(), MIN_MYSQL_SERVER_VERSION);
#elseTC_LOG_ERROR("sql.driver", "TrinityCore does not support MariaDB versions below " MIN_MARIADB_SERVER_VERSION_STRING " (found id {}, need id >= {}), please update your MySQL server", connection->GetServerVersion(), MIN_MARIADB_SERVER_VERSION);
#endifreturn 1;}else{// 将生成的mysql连接加入对应的mysql连接池_connections[type].push_back(std::move(connection));}}// Everything is finereturn 0;
}template <class T>
unsigned long DatabaseWorkerPool<T>::EscapeString(char* to, char const* from, unsigned long length)
{if (!to || !from || !length)return 0;return _connections[IDX_SYNCH].front()->EscapeString(to, from, length);
}template <class T>
void DatabaseWorkerPool<T>::Enqueue(SQLOperation* op)
{_queue->Push(op);
}template <class T>
size_t DatabaseWorkerPool<T>::QueueSize() const
{return _queue->Size();
}// 获取一条空闲连接
template <class T>
T* DatabaseWorkerPool<T>::GetFreeConnection()
{
#ifdef TRINITY_DEBUGif (_warnSyncQueries){std::ostringstream ss;ss << boost::stacktrace::stacktrace();TC_LOG_WARN("sql.performances", "Sync query at:\n{}", ss.str());}
#endifuint8 i = 0;auto const num_cons = _connections[IDX_SYNCH].size();T* connection = nullptr;//! Block forever until a connection is free// 循环遍历,获取一条空闲连接返回for (;;){connection = _connections[IDX_SYNCH][i++ % num_cons].get();//! Must be matched with t->Unlock() or you will get deadlocksif (connection->LockIfReady())break;}return connection;
}template <class T>
char const* DatabaseWorkerPool<T>::GetDatabaseName() const
{return _connectionInfo->database.c_str();
}template <class T>
void DatabaseWorkerPool<T>::Execute(char const* sql)
{if (!sql)return;BasicStatementTask* task = new BasicStatementTask(sql);Enqueue(task);
}template <class T>
void DatabaseWorkerPool<T>::Execute(PreparedStatement<T>* stmt)
{PreparedStatementTask* task = new PreparedStatementTask(stmt);Enqueue(task);
}template <class T>
void DatabaseWorkerPool<T>::DirectExecute(char const* sql)
{if (!sql)return;T* connection = GetFreeConnection();connection->Execute(sql);connection->Unlock();
}template <class T>
void DatabaseWorkerPool<T>::DirectExecute(PreparedStatement<T>* stmt)
{T* connection = GetFreeConnection();connection->Execute(stmt);connection->Unlock();//! Delete proxy-class. Not needed anymoredelete stmt;
}template <class T>
void DatabaseWorkerPool<T>::ExecuteOrAppend(SQLTransaction<T>& trans, char const* sql)
{if (!trans)Execute(sql);elsetrans->Append(sql);
}template <class T>
void DatabaseWorkerPool<T>::ExecuteOrAppend(SQLTransaction<T>& trans, PreparedStatement<T>* stmt)
{if (!trans)Execute(stmt);elsetrans->Append(stmt);
}template class TC_DATABASE_API DatabaseWorkerPool<LoginDatabaseConnection>;
template class TC_DATABASE_API DatabaseWorkerPool<WorldDatabaseConnection>;
template class TC_DATABASE_API DatabaseWorkerPool<CharacterDatabaseConnection>;
template class TC_DATABASE_API DatabaseWorkerPool<HotfixDatabaseConnection>;

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com