自定义 Kubernetes Controller
Kubernetes 的 Operator 模式
Controller 是 Kubernetes 中的重要组件,Kubernetes 依靠 Controller 来将集群维持在目标状态,比如 ReplicaSet Controller 会将根据用户的配置来维护一系列的 Pod。除了使用 Kubernetes 内置的资源和 Controller,用户还可以通过 Kubernetes 的扩展机制来自定义资源及其 Controller。
Kubernetes 的 Operator 模式能够让我们实现自己的 Controller。假如我们定义了一种叫 CR 的资源,那么在对 CR 资源进行创建、修改或删除时,对应的事件会被触发。CR Controller 在监听到相关的事件后,再进行进一步的处理。
controller-runtime 库提供了 Operator 模式的具体实现,我们可以通过它的源码来了解 Operator 模式。在 controller-runtime 中,Controller 由 Manager 管理,依赖 Reconciler 实现。例如,CR 资源的 Controller 可通过如下语句生成:
ctrl.NewControllerManagedBy(manager).
For(&CR{}).
Complete(reconciler)
在 Controller 启动后,执行的动作包括:
-
生成一个工作队列,用于存放 Request 对象。
-
启动 Watcher,并提供 EventHandler,用于监听 Controller 管理的资源的相关事件(Add / Update / Delete)。
-
启动 Reconciler(默认启动一个,但是支持并行启动多个)。
func (c *Controller) Start(ctx context.Context) error {
// ...
c.Queue = c.MakeQueue()
// ...
for _, watch := range c.startWatches {
if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
return err
}
}
// ...
wg := &sync.WaitGroup{}
wg.Add(c.MaxConcurrentReconciles)
for i := 0; i < c.MaxConcurrentReconciles; i++ {
go func() {
defer wg.Done()
// Run a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the reconcileHandler is never invoked concurrently with the same object.
for c.processNextWorkItem(ctx) {
}
}()
}
// ...
<-ctx.Done()
wg.Wait()
return nil
}
工作队列中存放的 Request 的结构如下。需要注意的是,它只包含了触发事件的对象的 Namespace 和 Name 属性,而不包含对象的具体信息以及被触发的事件相关信息。
// Request contains the information necessary to reconcile a Kubernetes object.
// This includes the information to uniquely identify the object - its Name and Namespace.
// It does NOT contain information about any specific Event or the object contents itself.
type Request struct {
// NamespacedName is the name and namespace of the object to reconcile.
NamespacedName
}
type NamespacedName struct {
Namespace string
Name string
}
EventHandler 定义了一系列回调函数。在自定义资源被创建、修改或删除时,需要向工作队列中添加一个 Request 对象,包含相关资源的 Namespace 和 Name 属性。
type EventHandler struct {
EventHandler handler.EventHandler
Queue workqueue.RateLimitingInterface
Predicates []predicate.Predicate
}
// OnAdd creates CreateEvent and calls Create on EventHandler
func (e EventHandler) OnAdd(obj interface{}) {
c := event.CreateEvent{}
// Pull Object out of the object
if o, ok := obj.(client.Object); ok {
c.Object = o
} else {
log.Error(nil, "OnAdd missing Object", "object", obj, "type", fmt.Sprintf("%T", obj))
return
}
for _, p := range e.Predicates {
if !p.Create(c) {
return
}
}
// Invoke create handler
e.EventHandler.Create(c, e.Queue)
}
// OnUpdate creates UpdateEvent and calls Update on EventHandler
func (e EventHandler) OnUpdate(oldObj, newObj interface{}) {
u := event.UpdateEvent{}
if o, ok := oldObj.(client.Object); ok {
u.ObjectOld = o
} else {
log.Error(nil, "OnUpdate missing ObjectOld", "object", oldObj, "type", fmt.Sprintf("%T", oldObj))
return
}
// Pull Object out of the object
if o, ok := newObj.(client.Object); ok {
u.ObjectNew = o
} else {
log.Error(nil, "OnUpdate missing ObjectNew", "object", newObj, "type", fmt.Sprintf("%T", newObj))
return
}
for _, p := range e.Predicates {
if !p.Update(u) {
return
}
}
// Invoke update handler
e.EventHandler.Update(u, e.Queue)
}
// OnDelete creates DeleteEvent and calls Delete on EventHandler
func (e EventHandler) OnDelete(obj interface{}) {
d := event.DeleteEvent{}
// ...
// Pull Object out of the object
if o, ok := obj.(client.Object); ok {
d.Object = o
} else {
log.Error(nil, "OnDelete missing Object", "object", obj, "type", fmt.Sprintf("%T", obj))
return
}
for _, p := range e.Predicates {
if !p.Delete(d) {
return
}
}
// Invoke delete handler
e.EventHandler.Delete(d, e.Queue)
}
Reconciler 会尝试从工作队列中获取 Request 对象。如果工作队列为空,则阻塞;否则,对 Request 做进一步的处理。
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := c.Queue.Get()
if shutdown {
// Stop working
return false
}
defer c.Queue.Done(obj)
// ...
c.reconcileHandler(ctx, obj)
return true
}
Request 对象的处理依赖于用户在创建 Controller 时提供的 Reconciler。通过其接口定义可知,Reconciler 至少需要提供一个 Reconcile 函数,它接收一个 Request 对象,并在返回结果中指示工作队列接下来对该 Request 对象的处理方式。
由于 Request 对象中只包含资源的 Namespace 和 Name 属性,所以 Reconciler 需要自行与集群交互,查询相关资源的状态。
// Reconciliation is level-based, meaning action isn't driven off changes in individual Events, but instead is
// driven by actual cluster state read from the apiserver or a local cache.
// For example if responding to a Pod Delete Event, the Request won't contain that a Pod was deleted,
// instead the reconcile function observes this when reading the cluster state and seeing the Pod as missing.
type Reconciler interface {
// Reconciler performs a full reconciliation for the object referred to by the Request.
// The Controller will requeue the Request to be processed again if an error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
Reconcile(context.Context, Request) (Result, error)
}
// Result contains the result of a Reconciler invocation.
type Result struct {
// Requeue tells the Controller to requeue the reconcile key. Defaults to false.
Requeue bool
// RequeueAfter if greater than 0, tells the Controller to requeue the reconcile key after the Duration.
// Implies that Requeue is true, there is no need to set Requeue to true at the same time as RequeueAfter.
RequeueAfter time.Duration
}
调用 Reconcile 函数之后,该 Request 对象的处理流程如下。如果调用返回错误,则将该 Request 对象放回工作队列中;如果结果中的 RequeueAfter 大于零,则先在工作队列中把该 Request 对象删除,经过设定的时间后再把该 Request 对象放回工作队列中;如果结果中的 RequeueAfter 等于零,但是设置了 Requeue 为 true,则直接将 Request 对象放回工作队列中;如果以上情况皆不满足,则说明 Request 对象处理完成,可在工作队列中删除该 Request 对象。
总的来说,如果 Reconcile 流程出错或者需要重新执行,则需要将 Request 对象放回到队列中;而如果 Reconcile 流程执行完成,则在队列中删除对应的 Request 对象。
func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
// ...
// Make sure that the the object is a valid request.
req, ok := obj.(reconcile.Request)
if !ok {
c.Queue.Forget(obj)
return
}
// ...
if result, err := c.Do.Reconcile(ctx, req); err != nil {
c.Queue.AddRateLimited(req)
return
}
if result.RequeueAfter > 0 {
c.Queue.Forget(obj)
c.Queue.AddAfter(req, result.RequeueAfter)
return
}
if result.Requeue {
c.Queue.AddRateLimited(req)
return
}
c.Queue.Forget(obj)
}
自定义 Kubernetes Controller 实践
上文讲了 Kubernetes 的 Operator 模式,现在来看看实际的应用。
kubebuilder 是一个基于 controller-runtime 的 SDK,为自定义资源和 Controller 提供了方便。它的使用方式也很简单,通过 kubebuilder init
命令就可以生成一个脚手架:
kubebuilder init --domain whichxjy.com --repo github.com/whichxjy/kube-controller
假设我们要定义一种叫 Hello
的资源,那么通过 kubebuilder create api
命令就可以生成资源和 Controller 的相关文件:
kubebuilder create api --group myapp --version v1 --kind Hello
在 app/v1/hello_types.go
中可以定义 Hello
资源的结构。假设 Hello
资源的工作就是输出指定数量的 Hello
语句,那我们就可以在 HelloSpec
结构体中加入 HelloTimes
字段,表示需要输出的 Hello
语句的数量;同时,Hello
资源所处的阶段(类似于 Pod 资源所处的阶段)可以在 HelloStatus
结构体中用 Phase
字段表示。
type HelloPhase string
const (
HelloPending HelloPhase = "Pending"
HelloRunning HelloPhase = "Running"
HelloSucceeded HelloPhase = "Succeeded"
HelloFailed HelloPhase = "Failed"
)
// HelloSpec defines the desired state of Hello
type HelloSpec struct {
HelloTimes uint `json:"helloTimes,omitempty"`
}
// HelloStatus defines the observed state of Hello
type HelloStatus struct {
Phase HelloPhase `json:"phase,omitempty"`
}
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// Hello is the Schema for the hellos API
type Hello struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec HelloSpec `json:"spec,omitempty"`
Status HelloStatus `json:"status,omitempty"`
}
Hello Controller
的具体实现在 controllers/hello_controller.go
文件中完成。Hello
资源刚被创建时,其状态为 Pending
,在切换状态的过程中会执行不同的操作。
func (r *HelloReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
logger := r.Log.WithValues("hello", req.NamespacedName)
logger.Info("Received request", "Namespace", req.Namespace, "Name", req.Name)
ctx := context.Background()
hello := new(myappv1.Hello)
if err := r.Get(ctx, req.NamespacedName, hello); err != nil {
if kerrors.IsNotFound(err) {
err = nil
}
return ctrl.Result{}, err
}
if hello.Status.Phase == "" {
hello.Status.Phase = myappv1.HelloPending
}
logger.Info("Check phase", "Phase", hello.Status.Phase)
requeue := false
switch hello.Status.Phase {
case myappv1.HelloPending:
// ...
case myappv1.HelloRunning:
// ...
case myappv1.HelloSucceeded:
// ...
case myappv1.HelloFailed:
// ...
default:
logger.Error(nil, "Invalid phase", "Phase", hello.Status.Phase)
return ctrl.Result{}, errors.New("Invalid phase")
}
// Update hello status.
if err := r.Status().Update(ctx, hello); err != nil {
logger.Error(err, "Fail to update hello status")
return ctrl.Result{}, err
}
return ctrl.Result{Requeue: requeue}, nil
}
如果 Hello
资源处在 Pending
阶段,则创建一个 Pod 来执行命令,然后将 Hello
资源的状态修改为 Running
。
case myappv1.HelloPending:
// Create a pod to run commands.
pod := getHelloPod(hello)
if err := ctrl.SetControllerReference(hello, pod, r.Scheme); err != nil {
logger.Error(err, "Fail to set controller reference")
return ctrl.Result{}, err
}
if err := r.Create(ctx, pod); err != nil {
logger.Error(err, "Fail to create pod")
return ctrl.Result{}, err
}
hello.Status.Phase = myappv1.HelloRunning
Pod 中会生成一个 Ubuntu 容器,容器中会输出用户指定数量的 Hello
语句。
func getHelloPod(hello *myappv1.Hello) *corev1.Pod {
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: hello.Namespace,
Name: hello.Name,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "ubuntu",
Image: "ubuntu",
Command: []string{
"/bin/sh",
"-c",
fmt.Sprintf(
"seq %d | xargs -I{} echo \"Hello\"",
hello.Spec.HelloTimes,
),
},
},
},
RestartPolicy: corev1.RestartPolicyOnFailure,
},
}
}
如果 Hello
资源处在 Running
阶段,则检查 Pod 的状态。如果 Pod 中的容器成功执行命令并退出,则将 Hello
资源的状态修改为 Succeeded
;如果 Pod 中的容器因为错误而退出,则将 Hello
资源的状态修改为 Failed
;如果两种情况都不是,则重新检查 Hello
资源的状态。
case myappv1.HelloRunning:
pod := &corev1.Pod{}
if err := r.Get(ctx, req.NamespacedName, pod); err != nil {
logger.Error(err, "Fail to get pod")
return ctrl.Result{}, err
}
if pod.Status.Phase == corev1.PodSucceeded {
hello.Status.Phase = myappv1.HelloSucceeded
} else if pod.Status.Phase == corev1.PodFailed {
hello.Status.Phase = myappv1.HelloFailed
} else {
requeue = true
}
如果 Hello
资源处在 Succeeded
阶段,则 Reconcile 过程完成;而如果 Hello
资源处在 Failed
阶段,则将 Hello
资源的状态修改为 Pending
,重新开始。
case myappv1.HelloSucceeded:
logger.Info("Done")
return ctrl.Result{}, nil
case myappv1.HelloFailed:
pod := getHelloPod(hello)
if err := r.Delete(ctx, pod); err != nil {
logger.Error(err, "Fail to delete pod")
return ctrl.Result{}, err
}
hello.Status.Phase = myappv1.HelloPending
完成代码后,可通过 make install
命令将自定义的资源应用于 Kubernetes 集群。之后,通过 make run
命令就可以启动 Controller。
/config/samples/myapp_v1_hello.yaml
文件中定义了 Hello
资源的示例配置文件。将其修改为:
apiVersion: myapp.whichxjy.com/v1
kind: Hello
metadata:
name: hello-sample
spec:
helloTimes: 3
然后通过 kubectl apply -f config/samples
命令应用示例配置文件。在 Controller 运行一段时间后,执行 kubectl logs hello-sample
命令就可以看到 Pod 中的日志:
Hello
Hello
Hello
Controller 的日志中展示了运行的过程:
2021-05-30T05:44:01.712+0800 INFO controllers.Hello Received request {"hello": "default/hello-sample", "Namespace": "default", "Name": "hello-sample"}
2021-05-30T05:44:01.712+0800 INFO controllers.Hello Check phase {"hello": "default/hello-sample", "Phase": "Pending"}
2021-05-30T05:44:01.760+0800 DEBUG controller-runtime.controller Successfully Reconciled {"controller": "hello", "request": "default/hello-sample"}
2021-05-30T05:44:01.760+0800 INFO controllers.Hello Received request {"hello": "default/hello-sample", "Namespace": "default", "Name": "hello-sample"}
2021-05-30T05:44:01.760+0800 INFO controllers.Hello Check phase {"hello": "default/hello-sample", "Phase": "Running"}
2021-05-30T05:44:01.873+0800 INFO controllers.Hello Received request {"hello": "default/hello-sample", "Namespace": "default", "Name": "hello-sample"}
2021-05-30T05:44:01.873+0800 INFO controllers.Hello Check phase {"hello": "default/hello-sample", "Phase": "Running"}
2021-05-30T05:44:01.889+0800 INFO controllers.Hello Received request {"hello": "default/hello-sample", "Namespace": "default", "Name": "hello-sample"}
2021-05-30T05:44:01.889+0800 INFO controllers.Hello Check phase {"hello": "default/hello-sample", "Phase": "Running"}
2021-05-30T05:44:01.914+0800 INFO controllers.Hello Received request {"hello": "default/hello-sample", "Namespace": "default", "Name": "hello-sample"}
2021-05-30T05:44:01.914+0800 INFO controllers.Hello Check phase {"hello": "default/hello-sample", "Phase": "Running"}
2021-05-30T05:44:01.963+0800 INFO controllers.Hello Received request {"hello": "default/hello-sample", "Namespace": "default", "Name": "hello-sample"}
2021-05-30T05:44:01.963+0800 INFO controllers.Hello Check phase {"hello": "default/hello-sample", "Phase": "Running"}
2021-05-30T05:44:02.054+0800 INFO controllers.Hello Received request {"hello": "default/hello-sample", "Namespace": "default", "Name": "hello-sample"}
2021-05-30T05:44:02.054+0800 INFO controllers.Hello Check phase {"hello": "default/hello-sample", "Phase": "Running"}
2021-05-30T05:44:02.221+0800 INFO controllers.Hello Received request {"hello": "default/hello-sample", "Namespace": "default", "Name": "hello-sample"}
2021-05-30T05:44:02.221+0800 INFO controllers.Hello Check phase {"hello": "default/hello-sample", "Phase": "Running"}
2021-05-30T05:44:02.550+0800 INFO controllers.Hello Received request {"hello": "default/hello-sample", "Namespace": "default", "Name": "hello-sample"}
2021-05-30T05:44:02.550+0800 INFO controllers.Hello Check phase {"hello": "default/hello-sample", "Phase": "Running"}
2021-05-30T05:44:03.202+0800 INFO controllers.Hello Received request {"hello": "default/hello-sample", "Namespace": "default", "Name": "hello-sample"}
2021-05-30T05:44:03.202+0800 INFO controllers.Hello Check phase {"hello": "default/hello-sample", "Phase": "Running"}
2021-05-30T05:44:04.491+0800 INFO controllers.Hello Received request {"hello": "default/hello-sample", "Namespace": "default", "Name": "hello-sample"}
2021-05-30T05:44:04.492+0800 INFO controllers.Hello Check phase {"hello": "default/hello-sample", "Phase": "Running"}
2021-05-30T05:44:07.060+0800 INFO controllers.Hello Received request {"hello": "default/hello-sample", "Namespace": "default", "Name": "hello-sample"}
2021-05-30T05:44:07.060+0800 INFO controllers.Hello Check phase {"hello": "default/hello-sample", "Phase": "Running"}
2021-05-30T05:44:12.198+0800 INFO controllers.Hello Received request {"hello": "default/hello-sample", "Namespace": "default", "Name": "hello-sample"}
2021-05-30T05:44:12.198+0800 INFO controllers.Hello Check phase {"hello": "default/hello-sample", "Phase": "Running"}
2021-05-30T05:44:12.203+0800 DEBUG controller-runtime.controller Successfully Reconciled {"controller": "hello", "request": "default/hello-sample"}
2021-05-30T05:44:12.204+0800 INFO controllers.Hello Received request {"hello": "default/hello-sample", "Namespace": "default", "Name": "hello-sample"}
2021-05-30T05:44:12.204+0800 INFO controllers.Hello Check phase {"hello": "default/hello-sample", "Phase": "Succeeded"}
2021-05-30T05:44:12.204+0800 INFO controllers.Hello Done {"hello": "default/hello-sample"}
2021-05-30T05:44:12.204+0800 DEBUG controller-runtime.controller Successfully Reconciled {"controller": "hello", "request": "default/hello-sample"}
完整代码可见 GitHub 仓库。