etcd是存储键值数据的服务器
客户端通过长连接watch实时更新数据
场景:
当主机A给服务器存储 name: 小王
主机B从服务器中查name ,得到name-小王
当主机A更改name 小李
服务器实时通知主机B name 已经被更改成小李了。
应用:服务注册与发现
更改配置
若需要修改,则可以配置:/etc/default/etcd
sudo vi /etc/profile
在最后加上 export ETCDCTL_API=3 来确定etcd的版本 (每个终端都要设置,报错了可以尝试查看该终端上是否更改了etcd的版本)
使用
1.启动 sudo systemctl start etcd
source /etc/profile
2.添加 etcdctl put
3.查找 etcdctl get
4.删除 etcdctl del
一些接口的使用
使用样例
put:
#include<iostream>
#include<etcd/Client.hpp>
#include<etcd/KeepAlive.hpp>
#include<etcd/Response.hpp>
#include<etcd/Value.hpp>
#include<etcd/Watcher.hpp>int main(int argc,char * argv[])
{//创建Clientconst string Host_url="http://127.0.0.1:2379"; etcd::Client client(Host_url); //用url初始化客户端//获取lease_id//keep_alive是一个保活对象,leasekeepalive(5),指的是生命周期为5s,//leasekeepalive()函数返回一个异步操作的对象,get()函数是指等待该异步对象操作完成,操作失败会抛出异常auto keep_alive=client.leasekeepalive(5).get(); //keep_alive是一个//获取租约的id,用于putauto leaseid=keep_alive->Lease();cout<<leaseid<<endl;//插入键值,传入key-value 以及leaseid,put操作返回异步对象auto ret=client.put("/service/user","127.0.0.1:8080",leaseid).get();if(ret.is_ok()==false){cout<<"插入键值失败了"<<endl;exit(1);}auto ret2=client.put("/service/friend","127.0.0.1:8081",leaseid).get();if(ret.is_ok()==false){cout<<"插入键值失败了"<<endl;exit(1);}//暂停该进程std::this_thread::sleep_for(std::chrono::seconds(10));
}
源码:
初始化
获取保活对象,异步对象
get:
#include<iostream>
#include<etcd/Client.hpp>
#include<etcd/KeepAlive.hpp>
#include<etcd/Response.hpp>
#include<etcd/Value.hpp>
#include<etcd/Watcher.hpp>
//自己设置的回调函数。
//
void cback(const etcd::Response& re)
{//照搬txt目录下的 watch检查是否出错if (re.error_code()) {std::cout << "Watcher " << re.watch_id() << " fails with "<< re.error_code() << ": " << re.error_message() << std::endl;}//源码 class Event {// public:// enum class EventType {// PUT,// DELETE_,// INVALID,// };//匹配事件for(const auto& e:re.events()){if(e.event_type()==etcd::Event::EventType::PUT){cout<<"你的key-value已经发生了改变"<<endl;cout<<"之前的key::"<<e.prev_kv().key()<<"-value"<<e.prev_kv().as_string()<<endl;cout<<"之前的key::"<<e.kv().key()<<"-value"<<e.kv().as_string()<<endl;}else if(e.event_type()==etcd::Event::EventType::DELETE_){ cout<<"你的value已经被删除了"<<endl;cout<<"之前的key::"<<e.prev_kv().key()<<"-value:"<<e.prev_kv().as_string()<<endl; }}
}
int main()
{const string Host_url="http://127.0.0.1:2379";//根据库中指定url初始化客户端etcd::Client client(Host_url);//ls是指获取该/service key值下的所有value,在路径查找中通常只需要设置一个目录即可找到该目录下全部value值。返回异步对象auto resp=client.ls("/service").get();if(resp.is_ok()==false){cout<<"获取信息无效"<<endl;exit(1);}//获取keys,指的是符合/service下的文件名个数,以便于遍历auto sz=resp.keys().size();cout<<sz<<endl;for(int i=0;i<sz;i++){cout<<resp.value(i).as_string()<<"可以提供"<<resp.key(i)<<"服务"<<endl;}//监控装置,监视/service下的所有key-value,通常只监控它是否修改和是否删除//需要自己设置cback回调函数auto watcher=etcd::Watcher(client, "/service",cback, true);watcher.Wait();//等待。相当于启动监听装置return 0;
}
源码:
tips:txt中有各种test的使用样例
封装客户端
二次封装:封装etcd-client-api,
实现两种类型的客户端
1.服务注册客户端:向服务器新增服务信息数据,并进行保活
2.服务发现客户端:从服务器查找服务信息数据,并进行改变事件监控封装的时候,我们尽量减少模块之间的耦合度,本质上etcd是一个键值存储系统,并不是专门用于作为注册中心进行服务注册和发现的。
封装思想:
1.封装服务注册客户端类提供一个接口:向服务器新增数据并进行保活参数:注册中心地址(etcd服务器地址),新增的服务信息(服务名-主机地址键值对)封装服务发现客户端类
服务下线事件接口(数据删除)
2.封装服务发现客户端类
提供两个设置回调函数的接口:服务上线事件接口(数据新增),服务下线事件接口(数据删除)
代码
#pragma once
#include <iostream>
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <etcd/Value.hpp>
#include <etcd/Watcher.hpp>
#include "../common/logger.hpp"#include <functional>
namespace common{
class Rigistry
{
public:using ptr=std::shared_ptr<Rigistry>;Rigistry(const string & Host): _client(std::make_shared<etcd::Client>(Host)), _keep_alive(_client->leasekeepalive(5).get()), _leaseid(_keep_alive->Lease()){}~Rigistry() { _keep_alive->Cancel(); }bool registry(const std::string& service, const std::string& host){auto ret = _client->put(service, host, _leaseid).get();if (ret.is_ok() == false){LOG_ERROR("客户端服务注册失败,{}", ret.error_message());return false;}return true;}private:std::shared_ptr<etcd::Client> _client;std::shared_ptr<etcd::KeepAlive> _keep_alive;int64_t _leaseid;
};class Discovery
{
public:using ptr=std::shared_ptr<Discovery>;using NotifyCallback = std::function<void(const std::string&, const std::string&)>;Discovery(const std::string &Host,const std::string &basedir,const NotifyCallback& put_cb,const NotifyCallback& del_cb)//在 Discovery 对象构造的过程中,online 和 offonline 会发生隐式转换 转换成NotifyCallback类型//因此得+const & 或者值传递的方式: _put_cb(put_cb), _del_cb(del_cb), _client(std::make_shared<etcd::Client>(Host)){auto resp = _client->ls(basedir).get();if (resp.is_ok() == false){LOG_ERROR("客户端服务获取信息失败,{}", resp.error_message());}auto sz = resp.keys().size();for (int i = 0; i < sz; i++){if(put_cb){put_cb(resp.key(i),resp.value(i).as_string());LOG_DEBUG("新增服务:{}-{}",resp.key(i),resp.value(i).as_string());}}_watcher=(std::make_shared<etcd::Watcher>(*_client.get(), basedir, std::bind(&Discovery::cback, this, std::placeholders::_1), true));// Watcher(Client const& client, 。。。要求传入client,// 我们的_client被用shared_ptr封装了起来,得解引用 + get();}private:void cback(const etcd::Response &re){if (re.is_ok()==false){std::cout <<"收到一个错误的事间通知"<<re.error_message() << std::endl;LOG_ERROR("客户端服务回调函数信息失败,{}", re.error_message());}for (const auto &e : re.events()){if (e.event_type() == etcd::Event::EventType::PUT){if(_put_cb){_put_cb(e.kv().key(),e.kv().as_string());}LOG_DEBUG("新增服务:{}-{}",e.kv().key(),e.kv().as_string());}else if (e.event_type() == etcd::Event::EventType::DELETE_){if(_del_cb){_del_cb(e.prev_kv().key(),e.prev_kv().as_string());}LOG_DEBUG("下线服务:{}-{}",e.prev_kv().key(),e.prev_kv().as_string());}}}private:NotifyCallback _put_cb;NotifyCallback _del_cb;std::shared_ptr<etcd::Client> _client;std::shared_ptr<etcd::Watcher> _watcher;
};
}