您的位置:首页 > 文旅 > 美景 > 免费行情软件app网站直播下载_昆明软件开发公司_网站推广怎么推广_网站建设合同模板

免费行情软件app网站直播下载_昆明软件开发公司_网站推广怎么推广_网站建设合同模板

2025/2/26 22:26:16 来源:https://blog.csdn.net/2303_78095330/article/details/145850996  浏览:    关键词:免费行情软件app网站直播下载_昆明软件开发公司_网站推广怎么推广_网站建设合同模板
免费行情软件app网站直播下载_昆明软件开发公司_网站推广怎么推广_网站建设合同模板

目录

第一节:代码实现

        1-1.Binding类

         1-2.绑定管理思想

        1-3.BindingMapper类

        1-4.BindingManager类

第二节:单元测试

下期预告: 


        绑定管理模块在mqserver目录下实现。

第一节:代码实现

        绑定管理模块与交换机管理模块也是十分相似的,它们的思想是一样的,但是在内存管理的策略上略有不同。

        创建一个名为mq_binding.hpp的文件,打开并防止重复包含、添加所需头文件、声明命名空间,这些内容可以在mq_exchange.hpp中拷贝: 

#ifndef __M_BINDING_H__
#define __M_BINDING_H__#include "../mqcommon/mq_logger.hpp"
#include "../mqcommon/mq_helper.hpp"
#include "../mqcommon/mq_msg.pb.h"#include <memory>
#include <iostream>
#include <unordered_map>namespace zd
{};#endif

        1-1.Binding类

        一个绑定只需要记录该绑定的交换机名称、队列名称和钥匙即可。钥匙就是交换机进行消息审核的关键要素。        

    // 1.绑定类class Binding{public:using ptr = std::shared_ptr<Binding>;std::string exchange_name; // 交换机名称std::string msgqueue_name; // 队列名称std::string binding_key;   // 钥匙Binding(){}Binding(const std::string& ename,const std::string& qname,const std::string& bid_ky):exchange_name(ename),msgqueue_name(qname),binding_key(bid_ky){}};

         1-2.绑定管理思想

        当客户端发布消息时,交换机先得到消息,然后它会获取自己的所有绑定进行审核。这个过程是以交换机为主体的,所以保存时就把交换机作为key,与它绑定的所有队列组成绑定列表作为value,这样找到交换机就能找到和它绑定的所有队列了。

        光这样也是不行的,因为无法获取交换机和队列之间的 binding_key,所以value中的队列再映射当前交换机与队列的class Binding,这样就可以获取绑定的所有信息了:

    using MsgQueueBindingMap = std::unordered_map<std::string,Binding::ptr>; // 消息队列与其绑定关系 1-1using BindingMap = std::unordered_map<std::string,MsgQueueBindingMap>; // 交换机与其所有消息队列的绑定关系 1-n// 这是以交换机为主体,每个交换机可以绑定多个队列// 一个队列也可以被多个交换机绑定// 以此形成互相多对多的关系

        虽然以交换机为主体,但是队列和交换机的关系仍然是平等的,交换机可以和多个队列绑定,队列也可以出现在多个交换机的绑定列表中,即与多个交换机绑定。

        1-3.BindingMapper类

        绑定的持久化管理与交换机的持久化管理几乎一样,唯一需要注意的地方是获取数据的回调函数对于数据的处理:

    // 2.绑定的持久化管理类class BindingMapper{public:BindingMapper(const std::string& dbfile):_sql_helper(dbfile){std::string path = FileHelper::parentDirectory(dbfile);FileHelper::createDirectory(path);assert(_sql_helper.open());}void createTable(){#define CREATE_TABLE__ "create table if not exists binding_table(\exchange_name varchar(32),\msgqueue_name varchar(32),\binding_key varchar(128));"bool ret = _sql_helper.exec(CREATE_TABLE__,nullptr,nullptr);if(ret == false){LOG("创建绑定数据库表失败!");abort(); // 异常退出}}void removeTable(){#define DROP_TABLE__ "drop table if exists binding_table;"bool ret = _sql_helper.exec(DROP_TABLE__,nullptr,nullptr);if(ret == false){LOG("删除绑定数据库表失败!");abort(); // 异常退出}}// 新增绑定信息bool insert(Binding::ptr& binding){#define INSERT_SQL__ "insert into binding_table values('%s','%s','%s');"char sql_str[4096] = {0};sprintf(sql_str,INSERT_SQL__,binding->exchange_name.c_str(),binding->msgqueue_name.c_str(),binding->binding_key.c_str());bool ret = _sql_helper.exec(sql_str,nullptr,nullptr);if(ret == false){LOG("数据库新增绑定信息失败!");return false;}return true;}// 移除绑定信息void remove(const std::string& ename,const std::string& qname){// "delete from binding_table where exchange_name='ename' and msgqueue_name='pname';";std::string DELE_SQL = "delete from binding_table where exchange_name='"+ename+"' and "+"msgqueue_name='"+qname+"';";bool ret = _sql_helper.exec(DELE_SQL,nullptr,nullptr);if(ret == false){LOG("数据库删除绑定信息失败!");abort(); // 异常退出}}// 移除指定交换机的所有绑定void removeExchangeBinding(const std::string ename){std::string DELE_SQL = "delete from binding_table where exchange_name='"+ename+"';";bool ret = _sql_helper.exec(DELE_SQL,nullptr,nullptr);if(ret == false){LOG("数据库删除绑定信息失败!");abort(); // 异常退出}}// 移除指定队列的所有绑定void removeMsgQueueBinding(const std::string qname){std::string DELE_SQL = "delete from binding_table where msgqueue_name='"+qname+"';";bool ret = _sql_helper.exec(DELE_SQL,nullptr,nullptr);if(ret == false){LOG("数据库删除绑定信息失败!");abort(); // 异常退出}}// 获取历史数据BindingMap recovery(){createTable();BindingMap result;std::string sql = "select exchange_name,msgqueue_name,binding_key from binding_table;";// 它会根据数据库的数据量调整回调函数的调用次数,每次取出一套数据_sql_helper.exec(sql.c_str(),BindingMapper::selectCallback,&result);return result;}private:static int selectCallback(void* arg,int numcol,char** row,char** fields){BindingMap* result = (BindingMap*)arg;// 创建交换机-消息队列-key的绑定类Binding::ptr bmp = std::make_shared<Binding>(row[0],row[1],row[2]);// 找到该交换机对应的绑定列表MsgQueueBindingMap& qmap = (*result)[bmp->exchange_name];// 绑定列表添加新的队列qmap.insert(std::make_pair(bmp->msgqueue_name,bmp));return 0;}private:SqliteHelper _sql_helper;};

        1-4.BindingManager类

        需要注意的是获得/移除指定交换机的绑定信息时,可以一次性找到交换机的所有绑定;但是获得/移除指定队列的绑定信息时,需要遍历所有交换机的绑定列表,因为每个交换机都可能与该队列进行了绑定。

        其次,添加绑定时,如果交换机和队列都需要持久化,这个绑定才需要被持久化,所以设置缺省值为false,之后调用 bind() 函数之前外部会先进行判断,符合持久化条件才传入true。

    class BindingManager{public:using ptr = std::shared_ptr<BindingManager>;BindingManager(const std::string& dbfile):_mapper(dbfile){_bindings = _mapper.recovery();}// 添加一个绑定bool bind(const std::string& ename,const std::string& qname,const std::string& bid_ky,bool durable = false){std::unique_lock<std::mutex> lock(_mtx);auto it = _bindings.find(ename);// 交换机有绑定 && 交换机与消息队列有绑定if(it != _bindings.end() && it->second.find(qname) != it->second.end()) {// 绑定已经存在就不需要添加了return true;}// 创建该绑定Binding::ptr bid = std::make_shared<Binding>(ename,qname,bid_ky);// 绑定持久化if(durable == true){if(_mapper.insert(bid) == false){return false;}}// 在交换机-消息队列内存中添加改映射_bindings[ename].insert(std::make_pair(qname,bid));return true;}// 解绑void unBind(const std::string& ename,const std::string& qname){std::unique_lock<std::mutex> lock(_mtx);auto it = _bindings.find(ename);// 交换机无绑定 || 交换机与消息队列无绑定if(it == _bindings.end() || it->second.find(qname) == it->second.end()) {// 绑定不存在就不需要解绑了return;}// 解除绑定持久化_mapper.remove(ename,qname);// 解除交换机与消息队列的内存映射_bindings[ename].erase(qname);}// 移除指定交换机绑定信息void removeExchangeBindings(const std::string& ename){std::unique_lock<std::mutex> lock(_mtx);// 解除交换机的所有持久化绑定_mapper.removeExchangeBinding(ename);// 解除内存映射_bindings.erase(ename);}// 移除指定队列绑定信息void removeMsgQueueBindings(const std::string& qname){std::unique_lock<std::mutex> lock(_mtx);// 解除消息队列的所有持久化绑定_mapper.removeMsgQueueBinding(qname);// 解除内存映射// 遍历所有交换机,绑定了该队列的就解绑for(auto& eit:_bindings){auto qit = eit.second.find(qname);// 该队列存在,删除该绑定if(qit != eit.second.end()){_bindings[eit.first].erase(qit->first);}}}// 获得指定交换机的所有绑定关系MsgQueueBindingMap getExchangeBindings(const std::string& ename){std::unique_lock<std::mutex> lock(_mtx);auto eit = _bindings.find(ename);// 不存在返回空if(eit == _bindings.end()){return MsgQueueBindingMap();}return eit->second;}// 获取指定交换机&消息队列的绑定关系Binding::ptr getBindings(const std::string& ename,const std::string& qname){std::unique_lock<std::mutex> lock(_mtx);auto eit = _bindings.find(ename);// 不存在返回空if(eit == _bindings.end()){return Binding::ptr();}auto qit = eit->second.find(qname);if(qit == eit->second.end()){return Binding::ptr();}return qit->second;}// 判断绑定是否存在bool exists(const std::string& ename,const std::string& qname){std::unique_lock<std::mutex> lock(_mtx);auto eit = _bindings.find(ename);// 不存在返回falseif(eit == _bindings.end() || eit->second.find(qname) == eit->second.end()){return false;}return true;}// 获取所有绑定数size_t size(){std::unique_lock<std::mutex> lock(_mtx);// 遍历所有交换机,获得每个交换机的所有绑定数size_t all_count = 0;for(auto& eit:_bindings){all_count+=eit.second.size();}return all_count;}void clear(){std::unique_lock<std::mutex> lock(_mtx);_mapper.removeTable();_bindings.clear();}BindingMapper _mapper;private:std::mutex _mtx;BindingMap _bindings;};

第二节:单元测试

        进入mqtest目录。

        创建名为mq_binding_test.cc的文件,包含所需头文件并拷贝测试套件,直接拷贝即可:

#include "../mqserver/mq_binding.hpp"
#include <gtest/gtest.h>
#include <iostream>
#include <unordered_map>
zd::BindingManager::ptr bmp;
// 全局测试套件------------------------------------------------
// 自己初始化自己的环境,使不同单元测试之间解耦
class BindingTest :public testing::Environment
{
public:// 全部单元测试之前调用一次virtual void SetUp() override{std::cout << "单元测试执行前的环境初始化" << std::endl;bmp = std::make_shared<zd::BindingManager>("./data/meta.bd");}   // 全部单元测试之后调用一次virtual void TearDown() override{std::cout << "单元测试执行后的环境清理" << std::endl;// bmp->clear();}
};// 单元测试
// 测试名称与类名称相同,则会先调用SetUp
TEST(BindingTest,BindingTest_test1_Test)
{std::cout << "单元测试-1" << std::endl;// 添加绑定bmp->bind("e-1","q-1","",true);bmp->bind("e-1","q-2","",true);bmp->bind("e-2","q-1","",true);bmp->bind("e-2","q-2","",true);bmp->bind("e-3","q-1","",false);ASSERT_EQ(bmp->size(),5);bmp->unBind("e-1","q-3"); // 解除一个不存在的绑定ASSERT_EQ(bmp->size(),5);bmp->removeMsgQueueBindings("q-1"); // 解除q-1的所有绑定ASSERT_EQ(bmp->size(),2);
}TEST(BindingTest,BindingTest_test2_Test)
{std::cout << "单元测试-2" << std::endl;zd::MsgQueueBindingMap m = bmp->getExchangeBindings("e-1");for(auto& binding:m){LOG("%s",binding.first.c_str());}
}TEST(BindingTest,BindingTest_test3_Test)
{std::cout << "单元测试-3" << std::endl;
}
// 单元测试全部结束后调用TearDown// ----------------------------------------------------------
int main(int argc,char** argv)
{testing::InitGoogleTest(&argc,argv);testing::AddGlobalTestEnvironment(new BindingTest); // 注册Test的所有单元测试if(RUN_ALL_TESTS() != 0) // 运行所有单元测试{printf("单元测试失败!\n");}return 0;
}

        执行结果:

                        ​​​​​​​        ​​​​​​​

        没有报错,且打印符合预期。

        然后查看meta.bd中,binding_table中的所有数据:

                                                         

        因为binding_key是空的,所以没有保存数据,也是符合预期的。        

         最后又测试历史数据的拉取功能,注释掉单元测试-1和单元测试-2:

TEST(BindingTest,BindingTest_test3_Test)
{std::cout << "单元测试-3" << std::endl;ASSERT_EQ(bmp->exists("e-1","q-2"),true);ASSERT_EQ(bmp->exists("e-2","q-2"),true);ASSERT_EQ(bmp->exists("e-2","q-1"),false);
}

        结果也没有报错:

        ​​​​​​​        ​​​​​​​        ​​​​​​​        ​​​​​​​    

        其他的一些接口可以自己测试,篇幅原因不好展现了。

        至此,绑定管理模块也完成了。

下期预告: 

        最后是消息管理模块,它的管理思路与前三个模块都不同,因为它使用普通的文件,而不是sqlite数据库来保存持久化消息,涉及很多文件的操作。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com