// New creates a new Kubernetes discovery for the given role.funcNew(l log.Logger, conf *SDConfig)(*Discovery,error){if l ==nil{l = log.NewNopLogger()}var(kcfg *rest.Configerr error)if conf.KubeConfig !=""{kcfg, err = clientcmd.BuildConfigFromFlags("", conf.KubeConfig)if err !=nil{returnnil, err}}elseif conf.APIServer.URL ==nil{// Use the Kubernetes provided pod service account// as described in https://kubernetes.io/docs/admin/service-accounts-admin/kcfg, err = rest.InClusterConfig()if err !=nil{returnnil, err}level.Info(l).Log("msg","Using pod service account via in-cluster config")}else{rt, err := config.NewRoundTripperFromConfig(conf.HTTPClientConfig,"kubernetes_sd", config.WithHTTP2Disabled())if err !=nil{returnnil, err}kcfg =&rest.Config{Host: conf.APIServer.String(),Transport: rt,}}kcfg.UserAgent = userAgentc, err := kubernetes.NewForConfig(kcfg)if err !=nil{returnnil, err}return&Discovery{client: c,logger: l,role: conf.Role,namespaceDiscovery:&conf.NamespaceDiscovery,discoverers:make([]discovery.Discoverer,0),selectors:mapSelector(conf.Selectors),},nil}
var wg sync.WaitGroupfor_, dd :=range d.discoverers {wg.Add(1)gofunc(d discovery.Discoverer){defer wg.Done()d.Run(ctx, ch)}(dd)}
对应role=node的run
// Run implements the Discoverer interface.func(n *Node)Run(ctx context.Context, ch chan<-[]*targetgroup.Group){defer n.queue.ShutDown()if!cache.WaitForCacheSync(ctx.Done(), n.informer.HasSynced){if ctx.Err()!= context.Canceled {level.Error(n.logger).Log("msg","node informer unable to sync cache")}return}gofunc(){for n.process(ctx, ch){}}()// Block until the target provider is explicitly canceled.<-ctx.Done()}
func(m *Manager)updater(ctx context.Context, p *provider, updates chan[]*targetgroup.Group){for{select{case<-ctx.Done():returncase tgs, ok :=<-updates:receivedUpdates.WithLabelValues(m.name).Inc()if!ok {level.Debug(m.logger).Log("msg","Discoverer channel closed","provider", p.name)return}for_, s :=range p.subs {m.updateGroup(poolKey{setName: s, provider: p.name}, tgs)}select{case m.triggerSend <-struct{}{}:default:}}}}
调用 更新对象到target map中
func(m *Manager)updateGroup(poolKey poolKey, tgs []*targetgroup.Group){m.mtx.Lock()defer m.mtx.Unlock()if_, ok := m.targets[poolKey];!ok {m.targets[poolKey]=make(map[string]*targetgroup.Group)}for_, tg :=range tgs {if tg !=nil{// Some Discoverers send nil target group so need to check for it to avoid panics.m.targets[poolKey][tg.Source]= tg}}}
{// Scrape manager.g.Add(func()error{// When the scrape manager receives a new targets list// it needs to read a valid config for each job.// It depends on the config being in sync with the discovery manager so// we wait until the config is fully loaded.<-reloadReady.Cerr := scrapeManager.Run(discoveryManagerScrape.SyncCh())level.Info(logger).Log("msg","Scrape manager stopped")return err},func(err error){// Scrape manager needs to be stopped before closing the local TSDB// so that it doesn't try to write samples to a closed storage.level.Info(logger).Log("msg","Stopping scrape manager...")scrapeManager.Stop()},)}