入门

我们将创建一个示例项目,以让您了解它是如何工作的。这个示例将:

  • 对 Memcached CR 进行调整 - 该 CR 表示在集群上部署/管理的 Memcached 实例
  • 使用 Memcached 镜像创建一个部署
  • 不允许实例数量超过将在 CR 中定义的大小
  • 更新 Memcached CR 状态

创建一个项目

首先,创建并导航到你的项目目录。然后,使用 kubebuilder 初始化它:

mkdir $GOPATH/memcached-operator
cd $GOPATH/memcached-operator
kubebuilder init --domain=example.com

创建 Memcached API(创建、读取、删除):

接下来,我们将创建一个 API,负责在集群上部署和管理 Memcached 实例。

kubebuilder create api --group cache --version v1alpha1 --kind Memcached

理解API

该命令的主要目标是为 Memcached 类型生成自定义资源(CR)和自定义资源定义(CRD)。它创建了一个 API,分组为 cache.example.com,版本为 v1alpha1,唯一标识 Memcached 类型的新 CRD。通过使用 Kubebuilder 工具,我们可以定义我们的 API 和对象,以表示我们针对这些平台的解决方案。

虽然在这个例子中我们只添加了一种资源,但我们可以根据需要添加任意数量的 GroupKind。为了更容易理解,可以将 CRD 看作是我们自定义对象的定义,而 CR 则是这些对象的实例。

定义我们的 API

定义规格

现在,我们将定义集群中每个 Memcached 资源实例可以具有的值。在这个例子中,我们将允许通过以下方式配置实例的数量:

type MemcachedSpec struct {
	...
	Size int32 `json:"size,omitempty"`
}

创建状态定义

我们还希望追踪 Memcached 自定义资源(CR)的管理状态,可以让我们通过验证自定义资源(CR)对我们自定义 API 的描述,判断是否一切顺利进行或者是否遇到任何错误。这与我们对 Kubernetes API 中的其它资源所做的操作类似。

// MemcachedStatus 定义了 Memcached 的观察状态
type MemcachedStatus struct {
	Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type" protobuf:"bytes,1,rep,name=conditions"`
}

标记和验证

此外,我们希望验证我们在 自定义资源(CustomResource) 中添加的值,以确保这些值是有效的。为此,我们将使用 标记,例如 +kubebuilder:validation:Minimum=1

现在,请查看我们完全完成的示例。

../getting-started/testdata/project/api/v1alpha1/memcached_types.go
Apache License

Copyright 2025 The Kubernetes authors.

Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Imports
package v1alpha1

import (
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// EDIT THIS FILE!  THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required.  Any new fields you add must have json tags for the fields to be serialized.
// MemcachedSpec defines the desired state of Memcached.
type MemcachedSpec struct {
	// 插入额外的规格字段 - 集群的期望状态
	// 注意:修改此文件后,运行"make"以重新生成代码

	// Size defines the number of Memcached instances
	// The following markers will use OpenAPI v3 schema to validate the value
	// More info: https://book.kubebuilder.io/reference/markers/crd-validation.html
	// +kubebuilder:validation:Minimum=1
	// +kubebuilder:validation:Maximum=3
	// +kubebuilder:validation:ExclusiveMaximum=false
	Size int32 `json:"size,omitempty"`
}

// MemcachedStatus defines the observed state of Memcached.
type MemcachedStatus struct {
	// Represents the observations of a Memcached's current state.
	// Memcached.status.conditions.type are: "Available", "Progressing", and "Degraded"
	// Memcached.status.conditions.status are one of True, False, Unknown.
	// Memcached.status.conditions.reason the value should be a CamelCase string and producers of specific
	// condition types may define expected values and meanings for this field, and whether the values
	// are considered a guaranteed API.
	// Memcached.status.conditions.Message is a human readable message indicating details about the transition.
	// For further information see: https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#typical-status-properties

	Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type" protobuf:"bytes,1,rep,name=conditions"`
}

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status

// Memcached is the Schema for the memcacheds API.
type Memcached struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec   MemcachedSpec   `json:"spec,omitempty"`
	Status MemcachedStatus `json:"status,omitempty"`
}

// +kubebuilder:object:root=true

// MemcachedList contains a list of Memcached.
type MemcachedList struct {
	metav1.TypeMeta `json:",inline"`
	metav1.ListMeta `json:"metadata,omitempty"`
	Items           []Memcached `json:"items"`
}

func init() {
	SchemeBuilder.Register(&Memcached{}, &MemcachedList{})
}

生成具有规格和验证的清单

生成所有所需的文件:

  1. 运行 make generate,会在 api/v1alpha1/zz_generated.deepcopy.go 文件中添加深度拷贝(DeepCopy)内容。

  2. 运行 make manifests,会在 config/crd/bases 下生成 CRD 清单,并在 config/crd/samples 下生成一个示例文件。

这两个命令都使用 controller-gen 工具,但是用于代码和清单生成的标志会有所不同。

config/crd/bases/cache.example.com_memcacheds.yaml: Our Memcached CRD
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  annotations:
    controller-gen.kubebuilder.io/version: v0.17.2
  name: memcacheds.cache.example.com
spec:
  group: cache.example.com
  names:
    kind: Memcached
    listKind: MemcachedList
    plural: memcacheds
    singular: Memcached
  scope: Namespaced
  versions:
  - name: v1alpha1
    schema:
      openAPIV3Schema:
        description: Memcached 是 memcacheds API 的架构。
        properties:
          apiVersion:
            description: |-
APIVersion 定义了对象表示的版本化架构。
 服务器应将已识别的架构转换为最新的内部值,并且
 可能会拒绝无法识别的值。
 更多信息: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
 type: string
          kind:
            description: |-
Kind是一个字符串值,表示该对象所代表的REST资源。服务器可以从客户端提交请求的端点推断出这一点。不能被更新。采用驼峰命名法。更多信息: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
  type: string
          metadata:
            type: object
          spec:
            description: MemcachedSpec 定义了 Memcached 的期望状态。
            properties:
              size:
                description: |-
大小定义了 Memcached 实例的数量
以下标记将使用 OpenAPI v3 架构来验证值
更多信息: https://book.kubebuilder.io/reference/markers/crd-validation.html
 format: int32
                maximum: 3
                minimum: 1
                type: integer
            type: object
          status:
            description: MemcachedStatus 定义了 Memcached 的观察状态。
            properties:
              conditions:
                items:
                  description: 条件包含当前一个方面的详细信息。
                    该API资源的状态。
                  properties:
                    lastTransitionTime:
                      description: |-
lastTransitionTime 是条件从一种状态转换到另一种状态的最后时间。这应该是基础条件发生变化的时间。如果这未知,那么使用 API 字段变化的时间也是可以接受的。
 format: date-time
                      type: string
                    message:
                      description: |-
消息是一个可读的人类消息,指示有关过渡的详细信息。
这可以是一个空字符串。
 maxLength: 32768
                      type: string
                    observedGeneration:
                      description: |-
observedGeneration 表示设置条件时的 .metadata.generation 值。
例如,如果 .metadata.generation 当前是 12,但 .status.conditions[x].observedGeneration 是 9,那么该条件相对于实例的当前状态是过时的。
 format: int64
                      minimum: 0
                      type: integer
                    reason:
                      description: |-
原因包含一个程序标识符,用于指示该条件最后一次转换的原因。
                        特定条件类型的生产者可能会定义该字段的预期值和含义,
                        以及这些值是否被视为保证的 API。
                        该值应为 CamelCase 字符串。
                        此字段不得为空。
                      maxLength: 1024
                      minLength: 1
                      pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$
                      type: string
                    status:
                      description: 条件的状态,可能是:真、假、未知。
                      enum:
                      - "True"
                      - "False"
                      - Unknown
                      type: string
                    type:
                      description: 在CamelCase中或在foo.example.com/CamelCase中输入条件类型。
                      maxLength: 316
                      pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$
                      type: string
                  required:
                  - lastTransitionTime
                  - message
                  - reason
                  - status
                  - type
                  type: object
                type: array
            type: object
        type: object
    served: true
    storage: true
    subresources:
      status: {}

自定义资源示例

config/samples 目录下的清单,可以应用于集群的自定义资源示例。在这个例子中,通过将给定的资源应用于集群,我们将生成一个实例大小为1的部署(请参见 size: 1)。

apiVersion: cache.example.com/v1alpha1
kind: Memcached
metadata:
  labels:
    app.kubernetes.io/name: project
    app.kubernetes.io/managed-by: kustomize
  name: memcached-sample
spec:
  # TODO(用户):编辑以下值以确保您的操作数在集群中的 Pod/实例数量
 size: 1

对账流程

以简化的方式来说,Kubernetes 通过允许我们声明系统的期望状态来工作,然后其控制器持续观察集群,并采取行动以确保实际状态与期望状态保持一致。对于我们的自定义 API 和控制器,过程类似。请记住,我们正在扩展 Kubernetes 的行为和 API,以满足我们的特定需求。

在我们的控制器中,我们将实施一个对账流程。

本质上, reconciliation 过程作为一个循环运行,不断检查条件并执行必要的操作,直到达到所需的状态。该过程将持续运行,直到系统中的所有条件与我们实施中定义的所需状态一致。

这是一个伪代码示例来说明这一点:

reconcile App {

  // 检查应用的 Deployment 是否存在,如果不存在,则创建一个
  // 如果发生错误,则从调整的开始重新启动
  if err != nil {
    return reconcile.Result{}, err
  }

  // 检查应用程序是否存在服务,如果不存在,则创建一个
  // 如果出现错误,则从调整的开头重新开始
  if err != nil {
    return reconcile.Result{}, err
  }

  // 查找数据库 CR/CRD
  // 检查数据库部署的副本大小
  // 如果 deployment.replicas 的大小与 cr.size 不匹配,则更新它
  // 然后,从对账的开始重新启动。例如,通过返回 `reconcile.Result{Requeue: true}, nil`。 
  if err != nil {
    return reconcile.Result{Requeue: true}, nil
  }
  ...

  // 如果在循环结束时:
  // 一切都成功执行,并且对账可以停止 
  return reconcile.Result{}, nil

}

在我们例子的背景下。

当我们的示例自定义资源(CR)应用到集群时(即 kubectl apply -f config/sample/cache_v1alpha1_memcached.yaml),我们希望确保为我们的 Memcached 镜像创建一个部署(Deployment),并且它与 CR 中定义的副本数量相匹配。

为实现这一目标,我们需要首先执行一个操作,以检查我们的 Memcached 实例的 Deployment 是否已经存在于集群中。如果不存在,控制器将相应地创建该 Deployment。因此,我们的调整过程必须包含一个操作,以确保这一期望状态持续保持。该操作将涉及:

	// 检查部署是否已经存在,如果不存在则创建一个新部署。
	found := &appsv1.Deployment{}
	err = r.Get(ctx, types.NamespacedName{Name: memcached.Name, Namespace: memcached.Namespace}, found)
	if err != nil && apierrors.IsNotFound(err) {
		// 定义新的部署 
		dep := r.deploymentForMemcached()
		// 在集群上创建部署
		if err = r.Create(ctx, dep); err != nil {
            log.Error(err, "无法创建新的部署",
            "Deployment.Namespace", dep.Namespace, "Deployment.Name", dep.Name)
            return ctrl.Result{}, err
        }
		...
	}

接下来,请注意 deploymentForMemcached() 函数需要定义并返回应该在集群上创建的 Deployment。该函数应该根据以下示例构建具有必要规格的 Deployment 对象:

    dep := &appsv1.Deployment{
		Spec: appsv1.DeploymentSpec{
			Replicas: &replicas,
			Template: corev1.PodTemplateSpec{
				Spec: corev1.PodSpec{
					Containers: []corev1.Container{{
						Image:           "memcached:1.6.26-alpine3.19",
						Name:            "memcached",
						ImagePullPolicy: corev1.PullIfNotPresent,
						Ports: []corev1.ContainerPort{{
							ContainerPort: 11211,
							Name:          "memcached",
						}},
						Command: []string{"memcached", "--memory-limit=64", "-o", "modern", "-v"},
					}},
				},
			},
		},
	}

此外,我们需要实施一个机制,以验证集群中的 Memcached 副本数量是否与自定义资源(CR)中指定的期望数量相匹配。如果存在差异,调整过程必须更新集群以确保一致性。这意味着每当在集群上创建或更新 Memcached 类型的 CR 时,控制器将持续调整状态,直到实际副本数量与期望数量相匹配。以下示例说明了这一过程:

	...
	size := memcached.Spec.Size
	if *found.Spec.Replicas != size {
		found.Spec.Replicas = &size
		if err = r.Update(ctx, found); err != nil {
			log.Error(err, "Failed to update Deployment",
				"Deployment.Namespace", found.Namespace, "Deployment.Name", found.Name)
            return ctrl.Result{}, err
        }
    ...

现在,您可以查看负责管理 Memcached 类型自定义资源的完整控制器。该控制器确保集群中维持所需状态,确保我们的 Memcached 实例继续运行,并保持用户指定的副本数量。

internal/controller/memcached_controller.go: Our Controller Implementation
/*
 版权 2025 The Kubernetes authors.
 根据 Apache 许可证,版本 2.0("许可证")授权;
 您只能在遵守该许可证的情况下使用此文件。
 您可以在以下地址获得该许可证的副本:

   http://www.apache.org/licenses/LICENSE-2.0 

 除非适用法律要求或书面同意,否则根据该许可证分发的软件
 是按"原样"基础分发的,
 不提供任何形式的担保或条件,无论是明示还是否明示。
有关许可证下的具体权限和限制,请参见许可证。
*/

package controller

import (
	"context"
	"fmt"

	"time"

	appsv1 "k8s.io/api/apps/v1"
	corev1 "k8s.io/api/core/v1"
	apierrors "k8s.io/apimachinery/pkg/api/errors"
	"k8s.io/apimachinery/pkg/api/meta"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/types"
	"k8s.io/utils/ptr"

	"k8s.io/apimachinery/pkg/runtime"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/client"
	logf "sigs.k8s.io/controller-runtime/pkg/log"

	cachev1alpha1 "example.com/memcached/api/v1alpha1"
)

// 管理状态条件的定义const (
	// typeAvailableMemcached 表示部署调整的状态
	typeAvailableMemcached = "Available"
)

// MemcachedReconciler 负责调整 Memcached 对象。
type MemcachedReconciler struct {
	client.Client
	Scheme *runtime.Scheme
}

// +kubebuilder:rbac:groups=cache.example.com,resources=memcacheds,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=cache.example.com,resources=memcacheds/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=cache.example.com,resources=memcacheds/finalizers,verbs=update
// +kubebuilder:rbac:groups=core,resources=events,verbs=create;patch
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch

// Reconcile 是主 Kubernetes 调整循环的一部分,旨在将集群的当前状态向期望状态靠拢. 
// 控制器的调整循环必须是幂等的。遵循操作员模式,您将创建提供调整功能的控制器,
// 该功能负责同步资源,直到集群达到期望状态。
// 违反此建议会违背控制器运行时的设计原则,可能导致意想不到的后果,例如资源卡住并需要手动干预。
// 了解更多信息:
// - 关于操作员模式:https://kubernetes.io/docs/concepts/extend-kubernetes/operator/ 
// - 关于控制器:https://kubernetes.io/docs/concepts/architecture/controller/ 
// 
// 有关更多详细信息,请查看 Reconcile 及其结果: 
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.20.4/pkg/reconcile
func (r *MemcachedReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	log := logf.FromContext(ctx)

	// 获取 Memcached 实例
	// 目的是检查 Kind Memcached 的自定义资源是否
	// 已在集群上应用,如果没有,我们返回 nil 以停止对账过程
	memcached := &cachev1alpha1.Memcached{}
	err := r.Get(ctx, req.NamespacedName, memcached)
	if err != nil {
		if apierrors.IsNotFound(err) {
			// 如果未找到自定义资源,通常意味着它已被删除或未创建
			// 这样,我们将停止调整过程
			log.Info("memcached resource not found. Ignoring since object must be deleted")
			return ctrl.Result{}, nil
		}
		// 读取对象时出错 - 重新排队请求。
		log.Error(err, "Failed to get memcached")
		return ctrl.Result{}, err
	}

	// 当没有可用状态时,我们将状态设置为未知。
	if len(memcached.Status.Conditions) == 0 {
		meta.SetStatusCondition(&memcached.Status.Conditions, metav1.Condition{Type: typeAvailableMemcached, Status: metav1.ConditionUnknown, Reason: "Reconciling", Message: "Starting reconciliation"})
		if err = r.Status().Update(ctx, memcached); err != nil {
			log.Error(err, "Failed to update Memcached status")
			return ctrl.Result{}, err
		}

		// 在更新状态后,让我们重新获取 memcached 自定义资源
		// 这样我们就可以获得集群中资源的最新状态,避免
		// 引发错误"对象已被修改,请将
		// 您的更改应用于最新版本并重试",这将在后续操作中
		// 如果我们再次尝试更新它,将重新触发对账过程
		if err := r.Get(ctx, req.NamespacedName, memcached); err != nil {
			log.Error(err, "Failed to re-fetch memcached")
			return ctrl.Result{}, err
		}
	}

	// 检查部署是否已经存在,如果不存在则创建一个新部署。
	found := &appsv1.Deployment{}
	err = r.Get(ctx, types.NamespacedName{Name: memcached.Name, Namespace: memcached.Namespace}, found)
	if err != nil && apierrors.IsNotFound(err) {
		// 定义新的部署 
		dep, err := r.deploymentForMemcached(memcached)
		if err != nil {
			log.Error(err, "Failed to define new Deployment resource for Memcached")

			// 以下实现将更新状态
			meta.SetStatusCondition(&memcached.Status.Conditions, metav1.Condition{Type: typeAvailableMemcached,
				Status: metav1.ConditionFalse, Reason: "Reconciling",
				Message: fmt.Sprintf("Failed to create Deployment for the custom resource (%s): (%s)", memcached.Name, err)})

			if err := r.Status().Update(ctx, memcached); err != nil {
				log.Error(err, "Failed to update Memcached status")
				return ctrl.Result{}, err
			}

			return ctrl.Result{}, err
		}

		log.Info("Creating a new Deployment",
			"Deployment.Namespace", dep.Namespace, "Deployment.Name", dep.Name)
		if err = r.Create(ctx, dep); err != nil {
			log.Error(err, "无法创建新的部署",
				"Deployment.Namespace", dep.Namespace, "Deployment.Name", dep.Name)
			return ctrl.Result{}, err
		}

		// 部署成功创建
		// 我们将重新排队调整,以确保状态
		// 并继续进行下一步操作
		return ctrl.Result{RequeueAfter: time.Minute}, nil
	} else if err != nil {
		log.Error(err, "Failed to get Deployment")
		// 让我们返回错误,以便重新触发对账。
		return ctrl.Result{}, err
	}

	// CRD API 定义了 Memcached 类型具有一个 MemcachedSpec.Size 字段
	// 用于设置集群中部署实例的数量到所需状态。
	// 因此,以下代码将确保部署的大小与我们正在对账的
	// 自定义资源的 Size 规格中定义的相同。
	size := memcached.Spec.Size
	if *found.Spec.Replicas != size {
		found.Spec.Replicas = &size
		if err = r.Update(ctx, found); err != nil {
			log.Error(err, "Failed to update Deployment",
				"Deployment.Namespace", found.Namespace, "Deployment.Name", found.Name)

			// 在更新状态之前重新获取 memcached 自定义资源,
			// 以便我们能够获得集群中资源的最新状态,并避免
			// 引发错误"对象已被修改,请将您的更改应用于
			// 最新版本并重试",这将重新触发调整。
			if err := r.Get(ctx, req.NamespacedName, memcached); err != nil {
				log.Error(err, "Failed to re-fetch memcached")
				return ctrl.Result{}, err
			}

			// 以下实现将更新状态
			meta.SetStatusCondition(&memcached.Status.Conditions, metav1.Condition{Type: typeAvailableMemcached,
				Status: metav1.ConditionFalse, Reason: "Resizing",
				Message: fmt.Sprintf("Failed to update the size for the custom resource (%s): (%s)", memcached.Name, err)})

			if err := r.Status().Update(ctx, memcached); err != nil {
				log.Error(err, "Failed to update Memcached status")
				return ctrl.Result{}, err
			}

			return ctrl.Result{}, err
		}

		// 现在我们更新了大小,我们想要重新排队对账
		// 以便确保在更新之前我们拥有资源的最新状态。
		// 此外,它还将有助于确保集群上的期望状态。
		return ctrl.Result{Requeue: true}, nil
	}

	// 以下实现将更新状态
	meta.SetStatusCondition(&memcached.Status.Conditions, metav1.Condition{Type: typeAvailableMemcached,
		Status: metav1.ConditionTrue, Reason: "Reconciling",
		Message: fmt.Sprintf("自定义资源(%s)部署成功,创建了 %d 个副本", memcached.Name, size)})

	if err := r.Status().Update(ctx, memcached); err != nil {
		log.Error(err, "Failed to update Memcached status")
		return ctrl.Result{}, err
	}

	return ctrl.Result{}, nil
}

// SetupWithManager 将控制器与管理器进行设置。
func (r *MemcachedReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
		For(&cachev1alpha1.Memcached{}).
		Owns(&appsv1.Deployment{}).
		Named("memcached").
		Complete(r)
}

// deploymentForMemcached 返回一个 Memcached 部署对象。
func (r *MemcachedReconciler) deploymentForMemcached(
	memcached *cachev1alpha1.Memcached) (*appsv1.Deployment, error) {
	replicas := memcached.Spec.Size
	image := "memcached:1.6.26-alpine3.19"

	dep := &appsv1.Deployment{
		ObjectMeta: metav1.ObjectMeta{
			Name:      memcached.Name,
			Namespace: memcached.Namespace,
		},
		Spec: appsv1.DeploymentSpec{
			Replicas: &replicas,
			Selector: &metav1.LabelSelector{
				MatchLabels: map[string]string{"app.kubernetes.io/name": "project"},
			},
			Template: corev1.PodTemplateSpec{
				ObjectMeta: metav1.ObjectMeta{
					Labels: map[string]string{"app.kubernetes.io/name": "project"},
				},
				Spec: corev1.PodSpec{
					SecurityContext: &corev1.PodSecurityContext{
						RunAsNonRoot: ptr.To(true),
						SeccompProfile: &corev1.SeccompProfile{
							Type: corev1.SeccompProfileTypeRuntimeDefault,
						},
					},
					Containers: []corev1.Container{{
						Image:           image,
						Name:            "memcached",
						ImagePullPolicy: corev1.PullIfNotPresent,
						// 确保容器的限制性上下文
						// 详细信息请参考: https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted
						SecurityContext: &corev1.SecurityContext{
							RunAsNonRoot:             ptr.To(true),
							RunAsUser:                ptr.To(int64(1001)),
							AllowPrivilegeEscalation: ptr.To(false),
							Capabilities: &corev1.Capabilities{
								Drop: []corev1.Capability{
									"ALL",
								},
							},
						},
						Ports: []corev1.ContainerPort{{
							ContainerPort: 11211,
							Name:          "memcached",
						}},
						Command: []string{"memcached", "--memory-limit=64", "-o", "modern", "-v"},
					}},
				},
			},
		},
	}

	// 设置 Deployment 的 ownerRef
	// 更多信息: https://kubernetes.io/docs/concepts/overview/working-with-objects/owners-dependents/
	if err := ctrl.SetControllerReference(memcached, dep, r.Scheme); err != nil {
		return nil, err
	}
	return dep, nil
}

深入了解控制器实现

将管理器设置为监视资源

整个想法是关注对控制器重要的资源。当控制器感兴趣的资源发生变化时,Watch 会触发控制器的对账循环,确保资源的实际状态与控制器逻辑中定义的期望状态相匹配。

注意我们是如何配置管理器以监控事件,比如 Memcached 类型的自定义资源(CR)的创建、更新或删除,以及控制器管理和拥有的部署的任何更改:

// SetupWithManager 将控制器与管理器设置起来。
// 部署也会被监视以确保其
// 在集群中的期望状态。
func (r *MemcachedReconciler) SetupWithManager(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
		// 监视 Memcached 自定义资源,并在其创建、更新或删除时触发一致性检查。
		For(&cachev1alpha1.Memcached{}).
		// 监视由 Memcached 控制器管理的 Deployment。如果该控制器拥有和管理的 Deployment 发生任何变化,将触发对账,确保集群状态与期望状态一致。
		Owns(&appsv1.Deployment{}).
		Complete(r)
    }

但是,管理程序如何知道哪些资源是由它拥有的呢?

我们不希望我们的控制器监视集群中的任何部署并触发我们的调整循环。相反,我们只希望在运行我们 Memcached 实例的特定部署发生更改时触发调整。例如,如果有人不小心删除了我们的部署或更改了副本数,我们希望触发调整,以确保它返回到期望的状态。

监控程序知道要观察哪个部署,因为我们设置了 ownerRef(拥有者引用):

if err := ctrl.SetControllerReference(memcached, dep, r.Scheme); err != nil {
    return nil, err
}

授予权限

确保控制器拥有管理其资源所需的权限(即创建、获取、更新和列出)是很重要的。

RBAC 权限 现在通过 RBAC 标记 进行配置,这些标记用于生成和更新位于 config/rbac/ 中的清单文件。这些标记可以在每个控制器的 Reconcile() 方法中找到(并应在此处定义),请查看我们示例中的实现方式:

// +kubebuilder:rbac:groups=cache.example.com,resources=memcacheds,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=cache.example.com,resources=memcacheds/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=cache.example.com,resources=memcacheds/finalizers,verbs=update
// +kubebuilder:rbac:groups=core,resources=events,verbs=create;patch
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch

在对控制器进行更改后,运行 make generate 命令。这样会提示 controller-gen 刷新位于 config/rbac 下的文件。

config/rbac/role.yaml: Our RBAC Role generated
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: manager-role
rules:
- apiGroups:
  - ""
  resources:
  - events
  verbs:
  - create
  - patch
- apiGroups:
  - ""
  resources:
  - pods
  verbs:
  - get
  - list
  - watch
- apiGroups:
  - apps
  resources:
  - deployments
  verbs:
  - create
  - delete
  - get
  - list
  - patch
  - update
  - watch
- apiGroups:
  - cache.example.com
  resources:
  - memcacheds
  verbs:
  - create
  - delete
  - get
  - list
  - patch
  - update
  - watch
- apiGroups:
  - cache.example.com
  resources:
  - memcacheds/finalizers
  verbs:
  - update
- apiGroups:
  - cache.example.com
  resources:
  - memcacheds/status
  verbs:
  - get
  - patch
  - update

监控程序 (main.go)

cmd/main.go 文件中的 Manager 负责管理您应用程序中的控制器。

cmd/main.go: Our main.go
/*
 版权 2025 The Kubernetes authors.
 根据 Apache 许可证,版本 2.0("许可证")授权;
 您只能在遵守该许可证的情况下使用此文件。
 您可以在以下地址获得该许可证的副本:

   http://www.apache.org/licenses/LICENSE-2.0 

 除非适用法律要求或书面同意,否则根据该许可证分发的软件
 是按"原样"基础分发的,
 不提供任何形式的担保或条件,无论是明示还是否明示。
有关许可证下的具体权限和限制,请参见许可证。
*/

package main

import (
	"crypto/tls"
	"flag"
	"os"
	"path/filepath"

	// 导入所有 Kubernetes 客户端身份验证插件(例如 Azure、GCP、OIDC 等)
	// 确保 exec-entrypoint 和 run 可以使用它们。
	_ "k8s.io/client-go/plugin/pkg/client/auth"

	"k8s.io/apimachinery/pkg/runtime"
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
	clientgoscheme "k8s.io/client-go/kubernetes/scheme"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/certwatcher"
	"sigs.k8s.io/controller-runtime/pkg/healthz"
	"sigs.k8s.io/controller-runtime/pkg/log/zap"
	"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
	metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
	"sigs.k8s.io/controller-runtime/pkg/webhook"

	cachev1alpha1 "example.com/memcached/api/v1alpha1"
	"example.com/memcached/internal/controller"
	// +kubebuilder:scaffold:imports
)

var (
	scheme   = runtime.NewScheme()
	setupLog = ctrl.Log.WithName("setup")
)

func init() {
	utilruntime.Must(clientgoscheme.AddToScheme(scheme))

	utilruntime.Must(cachev1alpha1.AddToScheme(scheme))
	// +kubebuilder:scaffold:scheme}

// nolint:gocyclo
func main() {
	var metricsAddr string
	var metricsCertPath, metricsCertName, metricsCertKey string
	var webhookCertPath, webhookCertName, webhookCertKey string
	var enableLeaderElection bool
	var probeAddr string
	var secureMetrics bool
	var enableHTTP2 bool
	var tlsOpts []func(*tls.Config)
	flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+
		"使用 :8443 进行 HTTPS,使用 :8080 进行 HTTP,或者保持为 0 以禁用指标服务。")
	flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
	flag.BoolVar(&enableLeaderElection, "leader-elect", false,
		"Enable leader election for controller manager. "+
			"启用此功能将确保只有一个活动的控制管理器。")
	flag.BoolVar(&secureMetrics, "metrics-secure", true,
		"如果设置,指标端点将通过 HTTPS 安全提供。请使用 --metrics-secure=false 以改为使用 HTTP。")
	flag.StringVar(&webhookCertPath, "webhook-cert-path", "", "The directory that contains the webhook certificate.")
	flag.StringVar(&webhookCertName, "webhook-cert-name", "tls.crt", "The name of the webhook certificate file.")
	flag.StringVar(&webhookCertKey, "webhook-cert-key", "tls.key", "The name of the webhook key file.")
	flag.StringVar(&metricsCertPath, "metrics-cert-path", "",
		"The directory that contains the metrics server certificate.")
	flag.StringVar(&metricsCertName, "metrics-cert-name", "tls.crt", "The name of the metrics server certificate file.")
	flag.StringVar(&metricsCertKey, "metrics-cert-key", "tls.key", "The name of the metrics server key file.")
	flag.BoolVar(&enableHTTP2, "enable-http2", false,
		"If set, HTTP/2 will be enabled for the metrics and webhook servers")
	opts := zap.Options{
		Development: true,
	}
	opts.BindFlags(flag.CommandLine)
	flag.Parse()

	ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))

	// 如果 enable-http2 标志为 false(默认值),则应禁用 http/2
	// 由于其存在的漏洞。更具体地说,禁用 http/2 将
	// 防止受到 HTTP/2 流取消和
	// 快速重置 CVE 的漏洞影响。有关更多信息,请参见:
	// - https://github.com/advisories/GHSA-qppj-fm5r-hxr3
	// - https://github.com/advisories/GHSA-4374-p667-p6c8
	disableHTTP2 := func(c *tls.Config) {
		setupLog.Info("disabling http/2")
		c.NextProtos = []string{"http/1.1"}
	}

	if !enableHTTP2 {
		tlsOpts = append(tlsOpts, disableHTTP2)
	}

	// 创建用于指标和 Webhook 证书的监视器
	var metricsCertWatcher, webhookCertWatcher *certwatcher.CertWatcher

	// 初始Webhook TLS选项
	webhookTLSOpts := tlsOpts

	if len(webhookCertPath) > 0 {
		setupLog.Info("使用提供的证书初始化 Webhook 证书监视器",
			"webhook-cert-path", webhookCertPath, "webhook-cert-name", webhookCertName, "webhook-cert-key", webhookCertKey)

		var err error
		webhookCertWatcher, err = certwatcher.New(
			filepath.Join(webhookCertPath, webhookCertName),
			filepath.Join(webhookCertPath, webhookCertKey),
		)
		if err != nil {
			setupLog.Error(err, "Failed to initialize webhook certificate watcher")
			os.Exit(1)
		}

		webhookTLSOpts = append(webhookTLSOpts, func(config *tls.Config) {
			config.GetCertificate = webhookCertWatcher.GetCertificate
		})
	}

	webhookServer := webhook.NewServer(webhook.Options{
		TLSOpts: webhookTLSOpts,
	})

	// 指标端点在 'config/default/kustomization.yaml' 中启用。指标选项配置服务器。
	// 更多信息:
	// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.20.4/pkg/metrics/server
	// - https://book.kubebuilder.io/reference/metrics.html
	metricsServerOptions := metricsserver.Options{
		BindAddress:   metricsAddr,
		SecureServing: secureMetrics,
		TLSOpts:       tlsOpts,
	}

	if secureMetrics {
		// FilterProvider 用于通过认证和授权来保护指标端点。
		// 这些配置确保只有授权用户和服务账户
		// 可以访问指标端点。RBAC 配置在 'config/rbac/kustomization.yaml' 中。更多信息:
		// https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.20.4/pkg/metrics/filters#WithAuthenticationAndAuthorization
		metricsServerOptions.FilterProvider = filters.WithAuthenticationAndAuthorization
	}

	// 如果未指定证书,controller-runtime 将自动
	// 为指标服务器生成自签名证书。虽然对于开发和测试来说很方便,
	// 但这种设置不建议用于生产环境。
	//
	// TODO(用户):如果您启用 certManager,请取消注释以下行:
	// - [METRICS-WITH-CERTS] 在 config/default/kustomization.yaml 中生成并使用
	// 由 cert-manager 管理的指标服务器证书。
	// - [PROMETHEUS-WITH-CERTS] 在 config/prometheus/kustomization.yaml 中用于 TLS 认证。
	if len(metricsCertPath) > 0 {
		setupLog.Info("使用提供的证书初始化指标证书监视器",
			"metrics-cert-path", metricsCertPath, "metrics-cert-name", metricsCertName, "metrics-cert-key", metricsCertKey)

		var err error
		metricsCertWatcher, err = certwatcher.New(
			filepath.Join(metricsCertPath, metricsCertName),
			filepath.Join(metricsCertPath, metricsCertKey),
		)
		if err != nil {
			setupLog.Error(err, "to initialize metrics certificate watcher", "error", err)
			os.Exit(1)
		}

		metricsServerOptions.TLSOpts = append(metricsServerOptions.TLSOpts, func(config *tls.Config) {
			config.GetCertificate = metricsCertWatcher.GetCertificate
		})
	}

	mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
		Scheme:                 scheme,
		Metrics:                metricsServerOptions,
		WebhookServer:          webhookServer,
		HealthProbeBindAddress: probeAddr,
		LeaderElection:         enableLeaderElection,
		LeaderElectionID:       "4b13cc52.example.com",
		// LeaderElectionReleaseOnCancel 定义了当管理器结束时,领导者是否应自愿辞职
		// 当管理器停止时,这要求二进制文件立即结束,否则这个设置是不安全的。设置这个可以显著
		// 加快自愿领导者的过渡,因为新的领导者不必首先等待
		// LeaseDuration 时间。
		//
		// 在提供的默认脚手架中,程序会在
		// 管理器停止后立即结束,因此启用此选项是可以的。不过,
		// 如果您正在进行或打算在管理器停止后执行任何操作,如执行清理,
		// 那么使用它可能是不安全的。
		// LeaderElectionReleaseOnCancel: true,
	})
	if err != nil {
		setupLog.Error(err, "unable to start manager")
		os.Exit(1)
	}

	if err = (&controller.MemcachedReconciler{
		Client: mgr.GetClient(),
		Scheme: mgr.GetScheme(),
	}).SetupWithManager(mgr); err != nil {
		setupLog.Error(err, "unable to create controller", "controller", "Memcached")
		os.Exit(1)
	}
	// +kubebuilder:scaffold:builder

	if metricsCertWatcher != nil {
		setupLog.Info("Adding metrics certificate watcher to manager")
		if err := mgr.Add(metricsCertWatcher); err != nil {
			setupLog.Error(err, "unable to add metrics certificate watcher to manager")
			os.Exit(1)
		}
	}

	if webhookCertWatcher != nil {
		setupLog.Info("Adding webhook certificate watcher to manager")
		if err := mgr.Add(webhookCertWatcher); err != nil {
			setupLog.Error(err, "unable to add webhook certificate watcher to manager")
			os.Exit(1)
		}
	}

	if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
		setupLog.Error(err, "unable to set up health check")
		os.Exit(1)
	}
	if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
		setupLog.Error(err, "unable to set up ready check")
		os.Exit(1)
	}

	setupLog.Info("starting manager")
	if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
		setupLog.Error(err, "problem running manager")
		os.Exit(1)
	}
}

检查集群中运行的项目

此时,您可以通过查看快速入门中定义的步骤来检查在集群上验证项目的步骤,请参见:在集群上运行

下一步

  • 为了更深入地开发您的解决方案,请考虑阅读 CronJob 教程
  • 有关优化您方法的见解,请参阅最佳实践文档。