自定义 Kubernetes Controller

Kubernetes 的 Operator 模式

Controller 是 Kubernetes 中的重要组件,Kubernetes 依靠 Controller 来将集群维持在目标状态,比如 ReplicaSet Controller 会将根据用户的配置来维护一系列的 Pod。除了使用 Kubernetes 内置的资源和 Controller,用户还可以通过 Kubernetes 的扩展机制来自定义资源及其 Controller。

Kubernetes 的 Operator 模式能够让我们实现自己的 Controller。假如我们定义了一种叫 CR 的资源,那么在对 CR 资源进行创建、修改或删除时,对应的事件会被触发。CR Controller 在监听到相关的事件后,再进行进一步的处理。

controller-runtime 库提供了 Operator 模式的具体实现,我们可以通过它的源码来了解 Operator 模式。在 controller-runtime 中,Controller 由 Manager 管理,依赖 Reconciler 实现。例如,CR 资源的 Controller 可通过如下语句生成:

ctrl.NewControllerManagedBy(manager).
    For(&CR{}).
    Complete(reconciler)

在 Controller 启动后,执行的动作包括:

  • 生成一个工作队列,用于存放 Request 对象。

  • 启动 Watcher,并提供 EventHandler,用于监听 Controller 管理的资源的相关事件(Add / Update / Delete)。

  • 启动 Reconciler(默认启动一个,但是支持并行启动多个)。

func (c *Controller) Start(ctx context.Context) error {
    // ...
    c.Queue = c.MakeQueue()
    // ...
    for _, watch := range c.startWatches {
        if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
            return err
        }
    }
    // ...
    wg := &sync.WaitGroup{}
    wg.Add(c.MaxConcurrentReconciles)
    for i := 0; i < c.MaxConcurrentReconciles; i++ {
        go func() {
            defer wg.Done()
            // Run a worker thread that just dequeues items, processes them, and marks them done.
            // It enforces that the reconcileHandler is never invoked concurrently with the same object.
            for c.processNextWorkItem(ctx) {
            }
        }()
    }
    // ...
    <-ctx.Done()
    wg.Wait()
    return nil
}

工作队列中存放的 Request 的结构如下。需要注意的是,它只包含了触发事件的对象的 Namespace 和 Name 属性,而不包含对象的具体信息以及被触发的事件相关信息。

// Request contains the information necessary to reconcile a Kubernetes object.
// This includes the information to uniquely identify the object - its Name and Namespace.
// It does NOT contain information about any specific Event or the object contents itself.
type Request struct {
    // NamespacedName is the name and namespace of the object to reconcile.
    NamespacedName
}

type NamespacedName struct {
    Namespace string
    Name      string
}

EventHandler 定义了一系列回调函数。在自定义资源被创建、修改或删除时,需要向工作队列中添加一个 Request 对象,包含相关资源的 Namespace 和 Name 属性。

type EventHandler struct {
    EventHandler handler.EventHandler
    Queue        workqueue.RateLimitingInterface
    Predicates   []predicate.Predicate
}

// OnAdd creates CreateEvent and calls Create on EventHandler
func (e EventHandler) OnAdd(obj interface{}) {
    c := event.CreateEvent{}

    // Pull Object out of the object
    if o, ok := obj.(client.Object); ok {
        c.Object = o
    } else {
        log.Error(nil, "OnAdd missing Object", "object", obj, "type", fmt.Sprintf("%T", obj))
        return
    }

    for _, p := range e.Predicates {
        if !p.Create(c) {
            return
        }
    }

    // Invoke create handler
    e.EventHandler.Create(c, e.Queue)
}

// OnUpdate creates UpdateEvent and calls Update on EventHandler
func (e EventHandler) OnUpdate(oldObj, newObj interface{}) {
    u := event.UpdateEvent{}

    if o, ok := oldObj.(client.Object); ok {
        u.ObjectOld = o
    } else {
        log.Error(nil, "OnUpdate missing ObjectOld", "object", oldObj, "type", fmt.Sprintf("%T", oldObj))
        return
    }

    // Pull Object out of the object
    if o, ok := newObj.(client.Object); ok {
        u.ObjectNew = o
    } else {
        log.Error(nil, "OnUpdate missing ObjectNew", "object", newObj, "type", fmt.Sprintf("%T", newObj))
        return
    }

    for _, p := range e.Predicates {
        if !p.Update(u) {
            return
        }
    }

    // Invoke update handler
    e.EventHandler.Update(u, e.Queue)
}

// OnDelete creates DeleteEvent and calls Delete on EventHandler
func (e EventHandler) OnDelete(obj interface{}) {
    d := event.DeleteEvent{}

    // ...

    // Pull Object out of the object
    if o, ok := obj.(client.Object); ok {
        d.Object = o
    } else {
        log.Error(nil, "OnDelete missing Object", "object", obj, "type", fmt.Sprintf("%T", obj))
        return
    }

    for _, p := range e.Predicates {
        if !p.Delete(d) {
            return
        }
    }

    // Invoke delete handler
    e.EventHandler.Delete(d, e.Queue)
}

Reconciler 会尝试从工作队列中获取 Request 对象。如果工作队列为空,则阻塞;否则,对 Request 做进一步的处理。

func (c *Controller) processNextWorkItem(ctx context.Context) bool {
    obj, shutdown := c.Queue.Get()
    if shutdown {
        // Stop working
        return false
    }
    defer c.Queue.Done(obj)
    // ...
    c.reconcileHandler(ctx, obj)
    return true
}

Request 对象的处理依赖于用户在创建 Controller 时提供的 Reconciler。通过其接口定义可知,Reconciler 至少需要提供一个 Reconcile 函数,它接收一个 Request 对象,并在返回结果中指示工作队列接下来对该 Request 对象的处理方式。

由于 Request 对象中只包含资源的 Namespace 和 Name 属性,所以 Reconciler 需要自行与集群交互,查询相关资源的状态。

// Reconciliation is level-based, meaning action isn't driven off changes in individual Events, but instead is
// driven by actual cluster state read from the apiserver or a local cache.
// For example if responding to a Pod Delete Event, the Request won't contain that a Pod was deleted,
// instead the reconcile function observes this when reading the cluster state and seeing the Pod as missing.
type Reconciler interface {
    // Reconciler performs a full reconciliation for the object referred to by the Request.
    // The Controller will requeue the Request to be processed again if an error is non-nil or
    // Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
    Reconcile(context.Context, Request) (Result, error)
}

// Result contains the result of a Reconciler invocation.
type Result struct {
    // Requeue tells the Controller to requeue the reconcile key.  Defaults to false.
    Requeue bool

    // RequeueAfter if greater than 0, tells the Controller to requeue the reconcile key after the Duration.
    // Implies that Requeue is true, there is no need to set Requeue to true at the same time as RequeueAfter.
    RequeueAfter time.Duration
}

调用 Reconcile 函数之后,该 Request 对象的处理流程如下。如果调用返回错误,则将该 Request 对象放回工作队列中;如果结果中的 RequeueAfter 大于零,则先在工作队列中把该 Request 对象删除,经过设定的时间后再把该 Request 对象放回工作队列中;如果结果中的 RequeueAfter 等于零,但是设置了 Requeue 为 true,则直接将 Request 对象放回工作队列中;如果以上情况皆不满足,则说明 Request 对象处理完成,可在工作队列中删除该 Request 对象。

总的来说,如果 Reconcile 流程出错或者需要重新执行,则需要将 Request 对象放回到队列中;而如果 Reconcile 流程执行完成,则在队列中删除对应的 Request 对象。

func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
    // ...

    // Make sure that the the object is a valid request.
    req, ok := obj.(reconcile.Request)
    if !ok {
        c.Queue.Forget(obj)
        return
    }

    // ...

    if result, err := c.Do.Reconcile(ctx, req); err != nil {
        c.Queue.AddRateLimited(req)
        return
    }
    if result.RequeueAfter > 0 {
        c.Queue.Forget(obj)
        c.Queue.AddAfter(req, result.RequeueAfter)
        return
    }
    if result.Requeue {
        c.Queue.AddRateLimited(req)
        return
    }

    c.Queue.Forget(obj)
}

自定义 Kubernetes Controller 实践

上文讲了 Kubernetes 的 Operator 模式,现在来看看实际的应用。

kubebuilder 是一个基于 controller-runtime 的 SDK,为自定义资源和 Controller 提供了方便。它的使用方式也很简单,通过 kubebuilder init 命令就可以生成一个脚手架:

kubebuilder init --domain whichxjy.com --repo github.com/whichxjy/kube-controller

假设我们要定义一种叫 Hello 的资源,那么通过 kubebuilder create api 命令就可以生成资源和 Controller 的相关文件:

kubebuilder create api --group myapp --version v1 --kind Hello

app/v1/hello_types.go 中可以定义 Hello 资源的结构。假设 Hello 资源的工作就是输出指定数量的 Hello 语句,那我们就可以在 HelloSpec 结构体中加入 HelloTimes 字段,表示需要输出的 Hello 语句的数量;同时,Hello 资源所处的阶段(类似于 Pod 资源所处的阶段)可以在 HelloStatus 结构体中用 Phase 字段表示。

type HelloPhase string

const (
    HelloPending   HelloPhase = "Pending"
    HelloRunning   HelloPhase = "Running"
    HelloSucceeded HelloPhase = "Succeeded"
    HelloFailed    HelloPhase = "Failed"
)

// HelloSpec defines the desired state of Hello
type HelloSpec struct {
    HelloTimes uint `json:"helloTimes,omitempty"`
}

// HelloStatus defines the observed state of Hello
type HelloStatus struct {
    Phase HelloPhase `json:"phase,omitempty"`
}

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status

// Hello is the Schema for the hellos API
type Hello struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`

    Spec   HelloSpec   `json:"spec,omitempty"`
    Status HelloStatus `json:"status,omitempty"`
}

Hello Controller 的具体实现在 controllers/hello_controller.go 文件中完成。Hello 资源刚被创建时,其状态为 Pending,在切换状态的过程中会执行不同的操作。

func (r *HelloReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
    logger := r.Log.WithValues("hello", req.NamespacedName)
    logger.Info("Received request", "Namespace", req.Namespace, "Name", req.Name)

    ctx := context.Background()

    hello := new(myappv1.Hello)
    if err := r.Get(ctx, req.NamespacedName, hello); err != nil {
        if kerrors.IsNotFound(err) {
            err = nil
        }
        return ctrl.Result{}, err
    }

    if hello.Status.Phase == "" {
        hello.Status.Phase = myappv1.HelloPending
    }

    logger.Info("Check phase", "Phase", hello.Status.Phase)
    requeue := false

    switch hello.Status.Phase {
    case myappv1.HelloPending:
        // ...
    case myappv1.HelloRunning:
        // ...
    case myappv1.HelloSucceeded:
        // ...
    case myappv1.HelloFailed:
        // ...
    default:
        logger.Error(nil, "Invalid phase", "Phase", hello.Status.Phase)
        return ctrl.Result{}, errors.New("Invalid phase")
    }

    // Update hello status.
    if err := r.Status().Update(ctx, hello); err != nil {
        logger.Error(err, "Fail to update hello status")
        return ctrl.Result{}, err
    }

    return ctrl.Result{Requeue: requeue}, nil
}

如果 Hello 资源处在 Pending 阶段,则创建一个 Pod 来执行命令,然后将 Hello 资源的状态修改为 Running

case myappv1.HelloPending:
    // Create a pod to run commands.
    pod := getHelloPod(hello)
    if err := ctrl.SetControllerReference(hello, pod, r.Scheme); err != nil {
        logger.Error(err, "Fail to set controller reference")
        return ctrl.Result{}, err
    }
    if err := r.Create(ctx, pod); err != nil {
        logger.Error(err, "Fail to create pod")
        return ctrl.Result{}, err
    }
    hello.Status.Phase = myappv1.HelloRunning

Pod 中会生成一个 Ubuntu 容器,容器中会输出用户指定数量的 Hello 语句。

func getHelloPod(hello *myappv1.Hello) *corev1.Pod {
    return &corev1.Pod{
        ObjectMeta: metav1.ObjectMeta{
            Namespace: hello.Namespace,
            Name:      hello.Name,
        },
        Spec: corev1.PodSpec{
            Containers: []corev1.Container{
                {
                    Name:  "ubuntu",
                    Image: "ubuntu",
                    Command: []string{
                        "/bin/sh",
                        "-c",
                        fmt.Sprintf(
                            "seq %d | xargs -I{} echo \"Hello\"",
                            hello.Spec.HelloTimes,
                        ),
                    },
                },
            },
            RestartPolicy: corev1.RestartPolicyOnFailure,
        },
    }
}

如果 Hello 资源处在 Running 阶段,则检查 Pod 的状态。如果 Pod 中的容器成功执行命令并退出,则将 Hello 资源的状态修改为 Succeeded;如果 Pod 中的容器因为错误而退出,则将 Hello 资源的状态修改为 Failed;如果两种情况都不是,则重新检查 Hello 资源的状态。

case myappv1.HelloRunning:
    pod := &corev1.Pod{}
    if err := r.Get(ctx, req.NamespacedName, pod); err != nil {
        logger.Error(err, "Fail to get pod")
        return ctrl.Result{}, err
    }

    if pod.Status.Phase == corev1.PodSucceeded {
        hello.Status.Phase = myappv1.HelloSucceeded
    } else if pod.Status.Phase == corev1.PodFailed {
        hello.Status.Phase = myappv1.HelloFailed
    } else {
        requeue = true
    }

如果 Hello 资源处在 Succeeded 阶段,则 Reconcile 过程完成;而如果 Hello 资源处在 Failed 阶段,则将 Hello 资源的状态修改为 Pending,重新开始。

case myappv1.HelloSucceeded:
    logger.Info("Done")
    return ctrl.Result{}, nil
case myappv1.HelloFailed:
    pod := getHelloPod(hello)
    if err := r.Delete(ctx, pod); err != nil {
        logger.Error(err, "Fail to delete pod")
        return ctrl.Result{}, err
    }
    hello.Status.Phase = myappv1.HelloPending

完成代码后,可通过 make install 命令将自定义的资源应用于 Kubernetes 集群。之后,通过 make run 命令就可以启动 Controller。

/config/samples/myapp_v1_hello.yaml 文件中定义了 Hello 资源的示例配置文件。将其修改为:

apiVersion: myapp.whichxjy.com/v1
kind: Hello
metadata:
  name: hello-sample
spec:
  helloTimes: 3

然后通过 kubectl apply -f config/samples 命令应用示例配置文件。在 Controller 运行一段时间后,执行 kubectl logs hello-sample 命令就可以看到 Pod 中的日志:

Hello
Hello
Hello

Controller 的日志中展示了运行的过程:

2021-05-30T05:44:01.712+0800	INFO	controllers.Hello	Received request	{"hello": "default/hello-sample", "Namespace": "default", "Name": "hello-sample"}
2021-05-30T05:44:01.712+0800	INFO	controllers.Hello	Check phase	{"hello": "default/hello-sample", "Phase": "Pending"}
2021-05-30T05:44:01.760+0800	DEBUG	controller-runtime.controller	Successfully Reconciled	{"controller": "hello", "request": "default/hello-sample"}
2021-05-30T05:44:01.760+0800	INFO	controllers.Hello	Received request	{"hello": "default/hello-sample", "Namespace": "default", "Name": "hello-sample"}
2021-05-30T05:44:01.760+0800	INFO	controllers.Hello	Check phase	{"hello": "default/hello-sample", "Phase": "Running"}
2021-05-30T05:44:01.873+0800	INFO	controllers.Hello	Received request	{"hello": "default/hello-sample", "Namespace": "default", "Name": "hello-sample"}
2021-05-30T05:44:01.873+0800	INFO	controllers.Hello	Check phase	{"hello": "default/hello-sample", "Phase": "Running"}
2021-05-30T05:44:01.889+0800	INFO	controllers.Hello	Received request	{"hello": "default/hello-sample", "Namespace": "default", "Name": "hello-sample"}
2021-05-30T05:44:01.889+0800	INFO	controllers.Hello	Check phase	{"hello": "default/hello-sample", "Phase": "Running"}
2021-05-30T05:44:01.914+0800	INFO	controllers.Hello	Received request	{"hello": "default/hello-sample", "Namespace": "default", "Name": "hello-sample"}
2021-05-30T05:44:01.914+0800	INFO	controllers.Hello	Check phase	{"hello": "default/hello-sample", "Phase": "Running"}
2021-05-30T05:44:01.963+0800	INFO	controllers.Hello	Received request	{"hello": "default/hello-sample", "Namespace": "default", "Name": "hello-sample"}
2021-05-30T05:44:01.963+0800	INFO	controllers.Hello	Check phase	{"hello": "default/hello-sample", "Phase": "Running"}
2021-05-30T05:44:02.054+0800	INFO	controllers.Hello	Received request	{"hello": "default/hello-sample", "Namespace": "default", "Name": "hello-sample"}
2021-05-30T05:44:02.054+0800	INFO	controllers.Hello	Check phase	{"hello": "default/hello-sample", "Phase": "Running"}
2021-05-30T05:44:02.221+0800	INFO	controllers.Hello	Received request	{"hello": "default/hello-sample", "Namespace": "default", "Name": "hello-sample"}
2021-05-30T05:44:02.221+0800	INFO	controllers.Hello	Check phase	{"hello": "default/hello-sample", "Phase": "Running"}
2021-05-30T05:44:02.550+0800	INFO	controllers.Hello	Received request	{"hello": "default/hello-sample", "Namespace": "default", "Name": "hello-sample"}
2021-05-30T05:44:02.550+0800	INFO	controllers.Hello	Check phase	{"hello": "default/hello-sample", "Phase": "Running"}
2021-05-30T05:44:03.202+0800	INFO	controllers.Hello	Received request	{"hello": "default/hello-sample", "Namespace": "default", "Name": "hello-sample"}
2021-05-30T05:44:03.202+0800	INFO	controllers.Hello	Check phase	{"hello": "default/hello-sample", "Phase": "Running"}
2021-05-30T05:44:04.491+0800	INFO	controllers.Hello	Received request	{"hello": "default/hello-sample", "Namespace": "default", "Name": "hello-sample"}
2021-05-30T05:44:04.492+0800	INFO	controllers.Hello	Check phase	{"hello": "default/hello-sample", "Phase": "Running"}
2021-05-30T05:44:07.060+0800	INFO	controllers.Hello	Received request	{"hello": "default/hello-sample", "Namespace": "default", "Name": "hello-sample"}
2021-05-30T05:44:07.060+0800	INFO	controllers.Hello	Check phase	{"hello": "default/hello-sample", "Phase": "Running"}
2021-05-30T05:44:12.198+0800	INFO	controllers.Hello	Received request	{"hello": "default/hello-sample", "Namespace": "default", "Name": "hello-sample"}
2021-05-30T05:44:12.198+0800	INFO	controllers.Hello	Check phase	{"hello": "default/hello-sample", "Phase": "Running"}
2021-05-30T05:44:12.203+0800	DEBUG	controller-runtime.controller	Successfully Reconciled	{"controller": "hello", "request": "default/hello-sample"}
2021-05-30T05:44:12.204+0800	INFO	controllers.Hello	Received request	{"hello": "default/hello-sample", "Namespace": "default", "Name": "hello-sample"}
2021-05-30T05:44:12.204+0800	INFO	controllers.Hello	Check phase	{"hello": "default/hello-sample", "Phase": "Succeeded"}
2021-05-30T05:44:12.204+0800	INFO	controllers.Hello	Done	{"hello": "default/hello-sample"}
2021-05-30T05:44:12.204+0800	DEBUG	controller-runtime.controller	Successfully Reconciled	{"controller": "hello", "request": "default/hello-sample"}

完整代码可见 GitHub 仓库

Updated: