您的位置:首页 > 科技 > IT业 > 开发app最好的工具_邢台疫情防控最新规定_百度指数查询app_网页设计制作网站

开发app最好的工具_邢台疫情防控最新规定_百度指数查询app_网页设计制作网站

2025/4/3 22:12:45 来源:https://blog.csdn.net/realize_dream/article/details/144310716  浏览:    关键词:开发app最好的工具_邢台疫情防控最新规定_百度指数查询app_网页设计制作网站
开发app最好的工具_邢台疫情防控最新规定_百度指数查询app_网页设计制作网站

1、DeltaFIFO 定义分析

DeltaFIFO 是一个增量的本地队列,记录资源对象的变化过程。它生产者是 Reflector 组件,将监听到的对象,同步到 DeltaFIFO 中,DeltaFIFO 又对资源对象做了什么呢。

DeltaFIFO 结构体:

type DeltaFIFO struct {  lock sync.RWMutex  // 读写锁,操作 DeltaFIFO 中的items与queue之前都要先加锁;cond sync.Cond     // 条件变量,唤醒等待的协程items map[string]Deltas  // Delta 存储桶,key根据对象计算,value为Deltas类型;queue []string           // 存储对象key的队列populated bool initialPopulationCount int  keyFunc KeyFunc         // 计算对象key的函数knownObjects KeyListerGetter    // 列出已知的对象closed bool   // 队列是否被关闭emitDeltaTypeReplaced bool  
}type Deltas []Deltatype Delta struct {  Type   DeltaType  // 资源对象变化类型,如Added、Updated、Deleted、Sync、ReplacedObject interface{}  // 存储的资源对象,如pod等资源对象
}

从 DeltaFIFO 数据结构来看,里头存储着 map[obj key]Deltas 和 object queue。Delta 装有对象数据及对象的变化类型。其中 DeltaType 就是资源变化的类型, 比如 Add、Update 等;Delta Object 就是具体的 Kubernetes 资源对象, 如pod等资源对象。Reflector 负责 DeltaFIFO 的输入,Controller 负责处理 DeltaFIFO 的输出。

2、DeltaFIFO 初始化

可以看到 NewDeltaFIFOWithOptions 中初始化了一个 items 和 queue 都为空的 DeltaFIFO 并返回。

func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {  return NewDeltaFIFOWithOptions(DeltaFIFOOptions{  KeyFunction:  keyFunc,  KnownObjects: knownObjects,  })  
}  func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {  if opts.KeyFunction == nil {  opts.KeyFunction = MetaNamespaceKeyFunc  }  f := &DeltaFIFO{  items:        map[string]Deltas{},  queue:        []string{},  keyFunc:      opts.KeyFunction,  knownObjects: opts.KnownObjects,  emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,  }  f.cond.L = &f.lock  return f  
}

3、DeltaFIFO 核心方法分析

通过前面的 Reflector 组件分析时,watchHandler() 方法有个循环处理 event 事件里面,调用的如 store.Add() 等。可知 Reflector里的 store.Add() 其实就是 DeltaFIFO 的 Replace、Add、Update、Delete方法的。

func watchHandler(...) error {  
loop:  for {  select {  case <-stopCh:  return errorStopRequested  case err := <-errc:  return err  case event, ok := <-w.ResultChan():  // 不同类型的event事件,调用不同函数处理。如事件为Added则调用 store.Add 处理  switch event.Type {  case watch.Added:  err := store.Add(event.Object)  // ...  case watch.Modified:  err := store.Update(event.Object)  // ...   case watch.Deleted:  // ...  case watch.Bookmark:  default:    }  }  
}

资源对象从 DeltaFIFO 中 Pop 出去后又经过了哪些处理呢?
在 sharedIndexInformer.Run 方法中调用 NewDeltaFIFOWithOptions 初始化了 DeltaFIFO,然后将DeltaFIFO 作为参数传入初始化Config。

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {  // ...fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{  KnownObjects:          s.indexer,  EmitDeltaTypeReplaced: true,  })  cfg := &Config{  Queue:            fifo,  ListerWatcher:    s.listerWatcher,  ObjectType:       s.objectType,  FullResyncPeriod: s.resyncCheckPeriod,  RetryOnError:     false,  ShouldResync:     s.processor.shouldResync,  Process:           s.HandleDeltas,  WatchErrorHandler: s.watchErrorHandler,  }  func() {  s.controller = New(cfg)  }()  // ...s.controller.Run(stopCh)  
}

在controller的Run方法中,调用NewReflector初始化Reflector时,同步了 DeltaFIFO,并且执行 processLoop 方法。processLoop 重要的逻辑是循环调用 c.config.Queue.Pop 将 DeltaFIFO 中的队头元素给pop出来(实际上pop出来的是Deltas,是Delta的切片类型),然后调用 c.config.Process 方法来做处理,当处理出错时,再调用 c.config.Queue.AddIfNotPresent 将对象重新加入到DeltaFIFO中去。

func (c *controller) Run(stopCh <-chan struct{}) {  defer utilruntime.HandleCrash()  go func() {  <-stopCh  c.config.Queue.Close()  }()  // 调用NewReflector,初始化Reflector  r := NewReflector(  c.config.ListerWatcher,  c.config.ObjectType,  c.config.Queue,  c.config.FullResyncPeriod,  )  r.ShouldResync = c.config.ShouldResync  r.WatchListPageSize = c.config.WatchListPageSize  r.clock = c.clock  if c.config.WatchErrorHandler != nil {  r.watchErrorHandler = c.config.WatchErrorHandler  }  c.reflectorMutex.Lock()  c.reflector = r  c.reflectorMutex.Unlock()  var wg wait.Group  wg.StartWithChannel(stopCh, r.Run)  // 调用c.processLoop,开始controller的核心处理;  wait.Until(c.processLoop, time.Second, stopCh)  wg.Wait()  
}
// controller的核心处理方法processLoop中,最重要的逻辑是循环调用c.config.Queue.Pop  
// 将DeltaFIFO中的队头元素给pop出来(实际上pop出来的是Deltas,是Delta的切片类型),  
// 然后调用c.config.Process方法来做处理,当处理出错时,再调用c.config.Queue.AddIfNotPresent将对象重新加入到DeltaFIFO中去。
func (c *controller) processLoop() {  for {  obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))  if err != nil {  if err == ErrFIFOClosed {  return  }  if c.config.RetryOnError {  // This is the safe way to re-enqueue.  c.config.Queue.AddIfNotPresent(obj)  }  }  }  
}

接下来分析 DeltaFIFO 核心处理方法 Pop、Replace、Add、Update、Delete方法。
源码位置:staging/src/k8s.io/client-go/tools/cache/delta_fifo.go

3.1、Add 方法

主要逻辑:

  • (1)上锁
  • (2)调用f.queueActionLocked,操作 DeltaFIFO中的queue与Deltas,根据对象key构造Added类型的新Delta追加到相应的Deltas中
  • (3)解锁

func (f *DeltaFIFO) Add(obj interface{}) error {  f.lock.Lock()         // 上锁  defer f.lock.Unlock() // 解锁  f.populated = true  return f.queueActionLocked(Added, obj)  
}func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {  // 计算对象key的值  id, err := f.KeyOf(obj)  if err != nil {  return KeyError{obj, err}  }  // 构造新的 Delta,将新的 Delta 追加到 Deltas 末尾  oldDeltas := f.items[id]  newDeltas := append(oldDeltas, Delta{actionType, obj})  // 去重,目前只将Deltas最末尾的两个delete类型的Delta去重  newDeltas = dedupDeltas(newDeltas)  if len(newDeltas) > 0 {  // 判断对象的key是否在queue中,不在则添加入queue  if _, exists := f.items[id]; !exists {  f.queue = append(f.queue, id)  }  // 更新 items 中的Deltas  f.items[id] = newDeltas  // 通知所有的消费者解除阻塞  f.cond.Broadcast()  } else {  // ...  }  return nil  
}// 去重函数
func dedupDeltas(deltas Deltas) Deltas {  n := len(deltas)  if n < 2 {  return deltas  }  a := &deltas[n-1]  b := &deltas[n-2]  if out := isDup(a, b); out != nil {  deltas[n-2] = *out  return deltas[:n-1]  }  return deltas  
}func isDup(a, b *Delta) *Delta {  if out := isDeletionDup(a, b); out != nil {  return out  }  return nil  
}  func isDeletionDup(a, b *Delta) *Delta {  if b.Type != Deleted || a.Type != Deleted {  return nil  }  if _, ok := b.Object.(DeletedFinalStateUnknown); ok {  return a  }  return b  
}

3.2、pop 方法

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {  // 1.首先加锁  f.lock.Lock()  defer f.lock.Unlock()  //最后释放锁// 2、循环判断queue的长度是否为0,为0则阻塞住,调用f.cond.Wait(),等待通知;  // 将Add方法中的 queueActionLocked方法中的f.cond.Broadcast()相对应,它会通知所有的消费者解除阻塞)  for {  for len(f.queue) == 0 {  if f.closed {  return nil, ErrFIFOClosed  }  f.cond.Wait()  }  // 3、取出队列头对象key  id := f.queue[0]  // 4、把第一个对象key给pop出去  f.queue = f.queue[1:]  depth := len(f.queue)  // 5、initialPopulationCount变量减1,  // 当减到0时则说明initialPopulationCount代表第一次调用Replace方法加入DeltaFIFO中的对象key已经被pop完成  if f.initialPopulationCount > 0 {  f.initialPopulationCount--  }  // 6、从items中获取对象  item, ok := f.items[id]  if !ok {  // This should never happen  klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)  continue  }  // 7、把对象从items中删除  delete(f.items, id)       if depth > 10 {  trace := utiltrace.New("DeltaFIFO Pop Process",  utiltrace.Field{Key: "ID", Value: id},  utiltrace.Field{Key: "Depth", Value: depth},  utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})  defer trace.LogIfLong(100 * time.Millisecond)  }  // 8、处理pop出来的对象(调用PopProcessFunc处理函数)  err := process(item)  if e, ok := err.(ErrRequeue); ok {  f.addIfNotPresent(id, item)  err = e.Err  }   }  
}

3.3、Replace 方法

func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {  f.lock.Lock()  defer f.lock.Unlock()  keys := make(sets.String, len(list))  // keep backwards compat for old clients  action := Sync  if f.emitDeltaTypeReplaced {  action = Replaced  }  // 遍历所有对象并且计算key,循环调用f.queueActionLocked,  // 操作DeltaFIFO中的queue与Deltas,根据对象key构造Sync类型的新Delta追加到相应的Deltas中  for _, item := range list {  key, err := f.KeyOf(item)  if err != nil {  return KeyError{item, err}  }  keys.Insert(key)  if err := f.queueActionLocked(action, item); err != nil {  return fmt.Errorf("couldn't enqueue object: %v", err)  }  }  if f.knownObjects == nil {  queuedDeletions := 0  for k, oldItem := range f.items {  if keys.Has(k) {  continue  }          var deletedObj interface{}  if n := oldItem.Newest(); n != nil {  deletedObj = n.Object  }  queuedDeletions++  if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {  return err  }  }  if !f.populated {  f.populated = true          f.initialPopulationCount = keys.Len() + queuedDeletions  }  return nil  }  knownKeys := f.knownObjects.ListKeys()  queuedDeletions := 0  // 如果进来Replace方法的list中没有的key,就继续调用f.queueActionLocked处理  for _, k := range knownKeys {  if keys.Has(k) {  continue  }  deletedObj, exists, err := f.knownObjects.GetByKey(k)  if err != nil {  deletedObj = nil  klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)  } else if !exists {  deletedObj = nil  klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)  }  queuedDeletions++  if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {  return err  }  }  if !f.populated {  f.populated = true  f.initialPopulationCount = keys.Len() + queuedDeletions  }  return nil  
}

到这里 DeltaFIFO 分析基本结束,下一遍分析 client-go Indexer

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com