-
HTTPPool
既可以是服务端,也可以是客户端,这取决于特定的使用场景和上下文:- 作为客户端:当本地缓存没有找到需要的数据时,
HTTPPool
需要作为客户端,通过httpGetter
(实现了PeerGetter
接口)去其他的远程节点(服务端)请求数据。 - 作为服务端:当其他节点的
HTTPPool
实例(作为客户端)请求当前节点存储的数据时,当前节点的HTTPPool
实例则作为服务端响应这些请求。
- 作为客户端:当本地缓存没有找到需要的数据时,
-
HTTPPool
实现了PeerPicker
接口,httpGetter
实现了PeerGetter
接口,前者用来找到对应的远程节点(由于每个远程节点对应一个httpGetter
,因此实际找到的是httpGetter
),后者用来向远处节点请求数据 -
Peer.go
-
package geetype PeerPicker interface {PickPeer(key string) (peer PeerGetter, ok bool) }type PeerGetter interface {Get(group string, key string) ([]byte, error) }
-
PickerPeer
用来找到key
对应的远程节点,远程节点和PeerGetter
一一对应,找到PeerGetter就找到对应的远程节点 -
PeerGetter
根据group
和key
向远程节点获取数据
-
-
url.QueryEscape()
:net/url
包,用于将url
中的特殊字符用asll
码进行替换,防止丢失 -
var _ PeerGetter = (*httpGetter)(nil)
:检验接口实现是否正确 -
day5
需要实现http的客户端,对于缓存未命中时,需要向远程节点请求数据,此时被作为客户端,http.go
中已经在day2
实现了服务端的代码,需要再加上客户端-
HTTPPool
实现了上面的PeerPicker
接口,实现了其需要实现的PickPeer()
函数,该函数返回远程节点对应的PeerGetter
,找到PeerGetter
就能去访问远程节点的数据-
// 查找PeerGetter func (p *HTTPPool) PickPeer(key string) (PeerGetter, bool) {p.mu.Lock()defer p.mu.Unlock()if peer := p.peers.Get(key); peer != "" && peer != p.self {return p.httpGetters[peer], true}return nil, false } var _ PeerPicker = (*HTTPPool)(nil) // 检验接口实现是否正确
-
-
``httpGetter
实现了
PeerGetter接口,实现了其需要实现的
Get()函数,
Get()函数主要是吊桶
net/http中的
Get()方法,向远程节点请求数据,并使用
ioutil的
ReadAll`获取-
func (h *httpGetter) Get(group string, key string) ([]byte, error) {u := fmt.Sprintf("%s%s%s",h.baseURL,url.QueryEscape(group),url.QueryEscape(key),)res, err := http.Get(u)if err != nil {return nil, err}defer res.Body.Close()if res.StatusCode != http.StatusOK {return nil, fmt.Errorf("error response body: %v", err)}bytes, err := ioutil.ReadAll(res.Body)if err != nil {return nil, fmt.Errorf("error response body: %v", err)}return bytes, nil } var _ PeerGetter = (*httpGetter)(nil) // 检验接口实现是否正确
-
-
HTTPPool
管理客户端的所有过程,因此需要加入httpGetter
属性,其是一个map[string]*httpGetter
类型,通过key
映射httpGetter
,同时加入day4
中实现的一致性哈希过程,由于这些需要实现互斥访问哈希表,因此还需要一个mutex
-
type HTTPPool struct {self stringbasePath stringmu sync.Mutexpeers *consistenthash.Map // 缓存值httpGetters map[string]*httpGetter // 每个httpGetter对应一个远程节点,在缓存未命中时向远程节点请求数据 }
-
-
最后,将
HTTPPool
集成在主流程中,即group
中,group
需要实现RegisterPeer()
函数,将对应的HTTPPool
注入,同时封装getFromPeer()
方法,其中调用了之前的Get()
方法,用来请求远程节点的数据,重写一下load()
方法,在不存在HTTPPool
时才调用getLocally()
,否则调用getFromPeer
-
type Group struct {name string // 缓存名称getter Getter // 回调函数mainCache cache // 缓存peers PeerPicker // 集成HTTPPool } // 将HTTPPool注入到group中 func (g *Group) RegisterPeers(peer PeerPicker) {if g.peers != nil {panic("最多一个peerPicker")}g.peers = peer }// 缓存未命中 向远程节点请求 func (g *Group) load(key string) (Value ByteView, err error) {if g.peers != nil {if peer, ok := g.peers.PickPeer(key); ok {if value, err := g.getFromPeer(peer, key); err == nil {return value, nil}log.Println("获取Peer失败")}}// 不存在远程节点 调用回调函数return g.getLocally(key) }// 向远处节点请求 func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {bytes, err := peer.Get(g.name, key)if err != nil {return ByteView{}, err}return ByteView{b: bytes}, nil}
-
-
因此,流程如下
-
是 接收 key --> 检查是否被缓存 -----> 返回缓存值 ⑴| 否 是|-----> 是否应当从远程节点获取 -----> 与远程节点交互 --> 返回缓存值 ⑵| 否|-----> 调用`回调函数`,获取值并添加到缓存 --> 返回缓存值 ⑶
-
-