您的位置:首页 > 科技 > 能源 > Zookeeper的项目实现:

Zookeeper的项目实现:

2024/12/28 14:51:18 来源:https://blog.csdn.net/weixin_62946182/article/details/140904524  浏览:    关键词:Zookeeper的项目实现:

1. 创建 zk 的客户端类:

  • 实现与 zk Server 的连接、
  • 根据路径及URL(ip+port)创建结点
  • 由结点路径获取data (ip+port) 信息
#pragma  once#include<semaphore.h>
#include<zookeeper/zookeeper.h>
#include<string>//封装zk客户端类
class ZkClient{
public:ZkClient();~ZkClient();//zkCli 启动连接zkServervoid Star();//在zkServer 根据指定路径(/Servicename/functionname : ip+port)创建znode结点void Create (const char *path,const char*data ,int datalen,int state=0 );//根据参数指定路径参数、值std::string GetData(const char*path);
private://客户端句柄zhandle_t *m_zhandle;
};

1.1. 创建客户端与服务器的连接:

void ZkClient::Star()代码解析

//全局观察器,获取zkServer 给zkClien 的回复通知void global_watcher(zhandle_t *zh, int type, int state, const char *path,void *watcherCtx){if(type==ZOO_SESSION_EVENT)//回调消息类型是会话相关的类型{if(state==ZOO_CONNECTED_STATE){sem_t*sem=(sem_t*)zoo_get_context(zh);sem_post(sem);}}}// zkCli 启动连接zkServer
void ZkClient::Star()
{std::string host =MprpcApplication::GetInstance().GetConfig().Load("zookeeperip");std::string port =MprpcApplication::GetInstance().GetConfig().Load("zookeeperport");//param host comma separated host:port pairs, each corresponding to a zk//server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"std::string connstr=host+":"+port;m_zhandle=zookeeper_init(connstr.c_str(),global_watcher,3000,nullptr,nullptr,0);if(m_zhandle==nullptr){LOG_INFO("zookeeper_init error!");std::cout<<"zookeeper_init error!"<<std::endl;exit(EXIT_FAILURE);}/* This method creates a new handle and a zookeeper session that correspondsto that handle. Session establishment is asynchronous:meaning that the session should not be considered established until (and unless) anevent of state ZOO_CONNECTED_STATE is received.*/sem_t sem;sem_init(&sem,0,0);zoo_set_context(m_zhandle,&sem);sem_wait(&sem);LOG_ERROR("zookeeper_init success!");
}

我们需要调用 zkCli 的接口API 建立与 server 的连接:zookeeper_init

ZOOAPI zhandle_t *zookeeper_init(const char *host, watcher_fn fn,int recv_timeout, const clientid_t *clientid, void *context, int flags);/*
const char *host:ZooKeeper 服务器的地址  IP :Port
param host comma separated host:port pairs
server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
*/
/*watcher_fn fn ,回调函数:返回 server 与 Cli 的连接状态信息
*/
/*
int recv_timeout:超时时间(默认30000  ms->30s)
*/
/*
const clientid_t *clientid: 一个指向 clientid_t 结构的指针包含了客户端的会话 ID 和会话密码
如果这是客户端的第一次连接(即没有现有的会话),则此参数应该为 NULL。
*/
/*
void *context: 一个用户定义的上下文指针,被传递给 watcher_fn 函数这允许用户在回调函数中访问特定于应用程序的数据
*/
/*
int flags: 初始化连接的标志:这些标志的具体含义取决于 ZooKeeper 客户端库的版本连接选项,如是否要启用只读模式等。
*/

zookeeper_init 基于zookeeper_mt(多线程),实现异步连接

主线程:执行当前的zookeeper_init内容

pthread_create:

子线程1:网络I/O线程 由于客户端无高并发要求,采用 poll

子线程2:watcher回调线程返回server连接后的状态信息,判断是否成功

主线程:

zookeeper_init该函数返回一个创建的 zkCli 句柄,

若返回值为空,则说明该语句执行失败

返回值不为空,并不能说明连接已经建立,而表示该语句执行成功并创建了m_zhandle 的内存空间

  1. zookeeper_init 在创建完 m_zhandle 的句柄后,设置信号量并初始为0(资源不可用)
  2. 将信号量设置为 m_zhandle 的上下文 (新增加的关联信息)
  3. 在 sem_wait(&sem) 调用时,由于信号量的值为 0 线程将会阻塞,直到在watcher线程中判断连接成功后调用了 sem_post(&sem) 来增加信号量的值被唤醒,至此连接结束

子线程1:网络I/O线程

进行网络的数据收发,发送客户端的连接请求、结点的建立请求,获取server返回cli的watcher的回调信息

子线程2:watcher回调线程

type==ZOO_SESSION_EVENT//回调消息类型是会话相关的类型
state==ZOO_CONNECTED_STATE)//判断连接状态
//连接成功后:
sem_t*sem=(sem_t*)zoo_get_context(zh);//由句柄指针获取句柄的上下文,关联信息sem
sem_post(sem);//增加信号量

1.2. 创建结点并写入数据信息:

// 在zkServer 根据指定路径(/Servicename/functionname : ip+port)创建znode结点
void ZkClient::Create(const char *path, const char *data, int datalen, int state)
{char path_buf[128];int path_len=sizeof(path_buf);int flag;//判断结点是否已经存在,避免重重复创建flag=zoo_exists(m_zhandle,path,0,nullptr);if(ZNONODE==flag)//结点不存在{flag=zoo_create(m_zhandle,path,data,datalen,&ZOO_OPEN_ACL_UNSAFE,state,path_buf,path_len);if(flag==ZOK){LOG_INFO("znode create success...path:%s:",path);}else{LOG_ERROR("znode create error,path:%s,flag:%d",path,flag);exit(EXIT_FAILURE);}}}

1.3. 获取结点数据:

// 根据参数指定路径->获取结点值
std::string ZkClient::GetData(const char *path)
{char buff[64];int buff_len=sizeof(buff);//调用接口实现由结点路径、获取该结点的data 写入buffint flag=zoo_get(m_zhandle,path,0,buff,&buff_len,nullptr);if(flag!=ZOK)//返回值为ZOK 表示成功!{std::cout<<"get znode errr ... path:"<<path<<std::endl;return "";}return buff;
}

构造析构函数:

ZkClient::ZkClient():m_zhandle(nullptr)
{
}
ZkClient::~ZkClient()
{if(m_zhandle!=nullptr)zookeeper_close(m_zhandle);
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com