在 Kubernetes 中使用 client-go
实现 drain
功能涉及多个步骤,需要模仿 kubectl drain
的行为。这包括将节点标记为不可调度(cordon)、驱逐 Pod,并处理 DaemonSet 和不可驱逐 Pod 的逻辑。以下是实现 drain
的主要步骤:
1. 基本思路
drain
的核心步骤包括:
- 标记节点为不可调度(cordon)。
- 列出节点上的所有 Pod。
- 逐一驱逐可驱逐的 Pod。
- 忽略 DaemonSet 创建的 Pod。
- 检查
PodDisruptionBudget
(PDB)约束。
- 等待 Pod 驱逐完成。
2. 代码实现
以下是一个基于 client-go
的示例代码,演示如何实现 drain
功能。
package mainimport ("context""fmt""time"v1 "k8s.io/api/core/v1"metav1 "k8s.io/apimachinery/pkg/apis/meta/v1""k8s.io/apimachinery/pkg/util/wait""k8s.io/client-go/kubernetes""k8s.io/client-go/tools/clientcmd"
)func main() {// 加载 kubeconfigconfig, err := clientcmd.BuildConfigFromFlags("", "/path/to/kubeconfig")if err != nil {panic(err)}clientset, err := kubernetes.NewForConfig(config)if err != nil {panic(err)}nodeName := "your-node-name"err = drainNode(clientset, nodeName)if err != nil {fmt.Printf("Failed to drain node: %v\n", err)} else {fmt.Printf("Node %s drained successfully\n", nodeName)}
}// DrainNode 实现 drain 功能
func drainNode(clientset *kubernetes.Clientset, nodeName string) error {ctx := context.TODO()// Step 1: 将节点标记为不可调度if err := cordonNode(clientset, nodeName); err != nil {return fmt.Errorf("failed to cordon node: %w", err)}// Step 2: 获取节点上的所有 PodpodList, err := listPodsOnNode(clientset, nodeName)if err != nil {return fmt.Errorf("failed to list pods on node: %w", err)}// Step 3: 驱逐所有可驱逐的 Podfor _, pod := range podList.Items {if isDaemonSetPod(&pod) || isMirrorPod(&pod) {// 忽略 DaemonSet 和 Mirror Podcontinue}// 驱逐 Poderr := evictPod(clientset, &pod)if err != nil {return fmt.Errorf("failed to evict pod %s: %w", pod.Name, err)}}// Step 4: 等待所有 Pod 被驱逐err = wait.PollImmediate(2*time.Second, 5*time.Minute, func() (bool, error) {remainingPods, err := listPodsOnNode(clientset, nodeName)if err != nil {return false, err}// 检查是否还有非 DaemonSet/Mirror 的 Podfor _, pod := range remainingPods.Items {if !isDaemonSetPod(&pod) && !isMirrorPod(&pod) {return false, nil}}return true, nil})if err != nil {return fmt.Errorf("timeout waiting for pods to be evicted: %w", err)}return nil
}// 将节点标记为不可调度
func cordonNode(clientset *kubernetes.Clientset, nodeName string) error {ctx := context.TODO()node, err := clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})if err != nil {return err}if node.Spec.Unschedulable {// 已经是不可调度状态return nil}node.Spec.Unschedulable = true_, err = clientset.CoreV1().Nodes().Update(ctx, node, metav1.UpdateOptions{})return err
}// 获取节点上的所有 Pod
func listPodsOnNode(clientset *kubernetes.Clientset, nodeName string) (*v1.PodList, error) {ctx := context.TODO()podList, err := clientset.CoreV1().Pods("").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeName),})if err != nil {return nil, err}return podList, nil
}// 检查是否为 DaemonSet 创建的 Pod
func isDaemonSetPod(pod *v1.Pod) bool {for _, owner := range pod.OwnerReferences {if owner.Kind == "DaemonSet" {return true}}return false
}// 检查是否为 Mirror Pod
func isMirrorPod(pod *v1.Pod) bool {_, exists := pod.Annotations["kubernetes.io/config.mirror"]return exists
}// 驱逐 Pod
func evictPod(clientset *kubernetes.Clientset, pod *v1.Pod) error {ctx := context.TODO()eviction := &v1.Eviction{ObjectMeta: metav1.ObjectMeta{Name: pod.Name,Namespace: pod.Namespace,},}err := clientset.CoreV1().Pods(pod.Namespace).Evict(ctx, eviction)if err != nil {return err}return nil
}
3. 实现细节说明
-
节点 Cordon:
- 使用
clientset.CoreV1().Nodes().Update
更新节点的Unschedulable
字段。
- 使用
-
列出节点上的 Pod:
- 使用
FieldSelector
过滤出运行在目标节点上的 Pod。
- 使用
-
DaemonSet 和 Mirror Pod:
- 通过 Pod 的
OwnerReferences
判断是否属于 DaemonSet。 - 通过检查
kubernetes.io/config.mirror
注解判断是否为 Mirror Pod。
- 通过 Pod 的
-
Pod 驱逐:
- 使用
Eviction
对象发起驱逐请求。 - 需要确保 Pod 不违反
PodDisruptionBudget
。
- 使用
-
等待驱逐完成:
- 使用
wait.PollImmediate
轮询检查节点上的 Pod 是否已被全部驱逐。
- 使用
4. 注意事项
-
权限要求:
- 执行
cordon
和evict
操作需要相应的集群权限(如nodes/update
和pods/evict
)。
- 执行
-
DaemonSet Pod 的处理:
- 默认忽略 DaemonSet Pod,如需强制移除需特殊处理。
-
驱逐策略:
- 驱逐操作可能受到
PodDisruptionBudget
的限制,需要确保驱逐不会违反 PDB。
- 驱逐操作可能受到
-
资源规划:
- 确保其他节点有足够的资源接收被驱逐的 Pod。
以上代码和说明提供了一个完整的 client-go
实现 drain
的方法,适合在自定义工具或自动化运维场景中使用。