编写控制器测试
测试 Kubernetes 控制器是一个大主题,而 kubebuilder 为您生成的样板测试文件相对较少。
为了引导您了解用于 Kubebuilder 生成的控制器的集成测试模式,我们将重新审视在首个教程中构建的 CronJob,并为其编写一个简单的测试。
基本的方法是,在您生成的 suite_test.go
文件中,您将使用 envtest 创建一个本地 Kubernetes API 服务器,实例化并运行您的控制器,然后编写额外的 *_test.go
文件,通过 Ginkgo 来进行测试。
如果您想要调整您的 envtest 集群的配置,请参阅为集成测试配置 envtest一节,以及envtest 文档
。
测试环境设置
Apache License
Copyright 2025 The Kubernetes authors.
Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Imports
When we created the CronJob API with kubebuilder create api
in a previous chapter, Kubebuilder already did some test work for you. Kubebuilder scaffolded a internal/controller/suite_test.go
file that does the bare bones of setting up a test environment.
First, it will contain the necessary imports.
package controller
import (
"context"
"os"
"path/filepath"
"testing"
ctrl "sigs.k8s.io/controller-runtime"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
batchv1 "tutorial.kubebuilder.io/project/api/v1"
// +kubebuilder:scaffold:imports
)
// These tests use Ginkgo (BDD-style Go testing framework). Refer to
// http://onsi.github.io/ginkgo/ to learn more about Ginkgo.
Now, let’s go through the code generated.
var (
ctx context.Context
cancel context.CancelFunc
testEnv *envtest.Environment
cfg *rest.Config
k8sClient client.Client // You'll be using this client in your tests.
)
func TestControllers(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Controller Suite")
}
var _ = BeforeSuite(func() {
logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))
ctx, cancel = context.WithCancel(context.TODO())
var err error
The CronJob Kind is added to the runtime scheme used by the test environment. This ensures that the CronJob API is registered with the scheme, allowing the test controller to recognize and interact with CronJob resources.
err = batchv1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())
After the schemas, you will see the following marker. This marker is what allows new schemas to be added here automatically when a new API is added to the project.
// +kubebuilder:scaffold:scheme
The envtest environment is configured to load Custom Resource Definitions (CRDs) from the specified directory. This setup enables the test environment to recognize and interact with the custom resources defined by these CRDs.
By("bootstrapping test environment")
testEnv = &envtest.Environment{
CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")},
ErrorIfCRDPathMissing: true,
}
// Retrieve the first found binary directory to allow running tests from IDEs
if getFirstFoundEnvTestBinaryDir() != "" {
testEnv.BinaryAssetsDirectory = getFirstFoundEnvTestBinaryDir()
}
Then, we start the envtest cluster.
// cfg is defined in this file globally.
cfg, err = testEnv.Start()
Expect(err).NotTo(HaveOccurred())
Expect(cfg).NotTo(BeNil())
A client is created for our test CRUD operations.
k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
Expect(err).NotTo(HaveOccurred())
Expect(k8sClient).NotTo(BeNil())
One thing that this autogenerated file is missing, however, is a way to actually start your controller. The code above will set up a client for interacting with your custom Kind, but will not be able to test your controller behavior. If you want to test your custom controller logic, you’ll need to add some familiar-looking manager logic to your BeforeSuite() function, so you can register your custom controller to run on this test cluster.
You may notice that the code below runs your controller with nearly identical logic to your CronJob project’s main.go! The only difference is that the manager is started in a separate goroutine so it does not block the cleanup of envtest when you’re done running your tests.
Note that we set up both a “live” k8s client and a separate client from the manager. This is because when making assertions in tests, you generally want to assert against the live state of the API server. If you use the client from the manager (k8sManager.GetClient
), you’d end up asserting against the contents of the cache instead, which is slower and can introduce flakiness into your tests. We could use the manager’s APIReader
to accomplish the same thing, but that would leave us with two clients in our test assertions and setup (one for reading, one for writing), and it’d be easy to make mistakes.
Note that we keep the reconciler running against the manager’s cache client, though – we want our controller to behave as it would in production, and we use features of the cache (like indices) in our controller which aren’t available when talking directly to the API server.
k8sManager, err := ctrl.NewManager(cfg, ctrl.Options{
Scheme: scheme.Scheme,
})
Expect(err).ToNot(HaveOccurred())
err = (&CronJobReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
}).SetupWithManager(k8sManager)
Expect(err).ToNot(HaveOccurred())
go func() {
defer GinkgoRecover()
err = k8sManager.Start(ctx)
Expect(err).ToNot(HaveOccurred(), "failed to run manager")
}()
})
Kubebuilder also generates boilerplate functions for cleaning up envtest and actually running your test files in your controllers/ directory. You won’t need to touch these.
var _ = AfterSuite(func() {
By("tearing down the test environment")
cancel()
err := testEnv.Stop()
Expect(err).NotTo(HaveOccurred())
})
Now that you have your controller running on a test cluster and a client ready to perform operations on your CronJob, we can start writing integration tests!
// getFirstFoundEnvTestBinaryDir locates the first binary in the specified path.
// ENVTEST-based tests depend on specific binaries, usually located in paths set by
// controller-runtime. When running tests directly (e.g., via an IDE) without using
// Makefile targets, the 'BinaryAssetsDirectory' must be explicitly configured.
//
// This function streamlines the process by finding the required binaries, similar to
// setting the 'KUBEBUILDER_ASSETS' environment variable. To ensure the binaries are
// properly set up, run 'make setup-envtest' beforehand.
func getFirstFoundEnvTestBinaryDir() string {
basePath := filepath.Join("..", "..", "bin", "k8s")
entries, err := os.ReadDir(basePath)
if err != nil {
logf.Log.Error(err, "Failed to read directory", "path", basePath)
return ""
}
for _, entry := range entries {
if entry.IsDir() {
return filepath.Join(basePath, entry.Name())
}
}
return ""
}
测试您的控制器行为
Apache License
Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Ideally, we should have one <kind>_controller_test.go
for each controller scaffolded and called in the suite_test.go
. So, let’s write our example test for the CronJob controller (cronjob_controller_test.go.
)
Imports
As usual, we start with the necessary imports. We also define some utility variables.
package controller
import (
"context"
"reflect"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
cronjobv1 "tutorial.kubebuilder.io/project/api/v1"
)
The first step to writing a simple integration test is to actually create an instance of CronJob you can run tests against. Note that to create a CronJob, you’ll need to create a stub CronJob struct that contains your CronJob’s specifications.
Note that when we create a stub CronJob, the CronJob also needs stubs of its required downstream objects. Without the stubbed Job template spec and the Pod template spec below, the Kubernetes API will not be able to create the CronJob.
var _ = Describe("CronJob controller", func() {
// Define utility constants for object names and testing timeouts/durations and intervals.
const (
CronjobName = "test-cronjob"
CronjobNamespace = "default"
JobName = "test-job"
timeout = time.Second * 10
duration = time.Second * 10
interval = time.Millisecond * 250
)
Context("When updating CronJob Status", func() {
It("Should increase CronJob Status.Active count when new Jobs are created", func() {
By("By creating a new CronJob")
ctx := context.Background()
cronJob := &cronjobv1.CronJob{
TypeMeta: metav1.TypeMeta{
APIVersion: "batch.tutorial.kubebuilder.io/v1",
Kind: "CronJob",
},
ObjectMeta: metav1.ObjectMeta{
Name: CronjobName,
Namespace: CronjobNamespace,
},
Spec: cronjobv1.CronJobSpec{
Schedule: "1 * * * *",
JobTemplate: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
// For simplicity, we only fill out the required fields.
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
// For simplicity, we only fill out the required fields.
Containers: []v1.Container{
{
Name: "test-container",
Image: "test-image",
},
},
RestartPolicy: v1.RestartPolicyOnFailure,
},
},
},
},
},
}
Expect(k8sClient.Create(ctx, cronJob)).To(Succeed())
After creating this CronJob, let’s check that the CronJob’s Spec fields match what we passed in. Note that, because the k8s apiserver may not have finished creating a CronJob after our Create()
call from earlier, we will use Gomega’s Eventually() testing function instead of Expect() to give the apiserver an opportunity to finish creating our CronJob.
Eventually()
will repeatedly run the function provided as an argument every interval seconds until (a) the assertions done by the passed-in Gomega
succeed, or (b) the number of attempts * interval period exceed the provided timeout value.
In the examples below, timeout and interval are Go Duration values of our choosing.
cronjobLookupKey := types.NamespacedName{Name: CronjobName, Namespace: CronjobNamespace}
createdCronjob := &cronjobv1.CronJob{}
// We'll need to retry getting this newly created CronJob, given that creation may not immediately happen.
Eventually(func(g Gomega) {
g.Expect(k8sClient.Get(ctx, cronjobLookupKey, createdCronjob)).To(Succeed())
}, timeout, interval).Should(Succeed())
// Let's make sure our Schedule string value was properly converted/handled.
Expect(createdCronjob.Spec.Schedule).To(Equal("1 * * * *"))
Now that we’ve created a CronJob in our test cluster, the next step is to write a test that actually tests our CronJob controller’s behavior. Let’s test the CronJob controller’s logic responsible for updating CronJob.Status.Active with actively running jobs. We’ll verify that when a CronJob has a single active downstream Job, its CronJob.Status.Active field contains a reference to this Job.
First, we should get the test CronJob we created earlier, and verify that it currently does not have any active jobs. We use Gomega’s Consistently()
check here to ensure that the active job count remains 0 over a duration of time.
By("By checking the CronJob has zero active Jobs")
Consistently(func(g Gomega) {
g.Expect(k8sClient.Get(ctx, cronjobLookupKey, createdCronjob)).To(Succeed())
g.Expect(createdCronjob.Status.Active).To(BeEmpty())
}, duration, interval).Should(Succeed())
Next, we actually create a stubbed Job that will belong to our CronJob, as well as its downstream template specs. We set the Job’s status’s “Active” count to 2 to simulate the Job running two pods, which means the Job is actively running.
We then take the stubbed Job and set its owner reference to point to our test CronJob. This ensures that the test Job belongs to, and is tracked by, our test CronJob. Once that’s done, we create our new Job instance.
By("By creating a new Job")
testJob := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: JobName,
Namespace: CronjobNamespace,
},
Spec: batchv1.JobSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
// For simplicity, we only fill out the required fields.
Containers: []v1.Container{
{
Name: "test-container",
Image: "test-image",
},
},
RestartPolicy: v1.RestartPolicyOnFailure,
},
},
},
}
// Note that your CronJob’s GroupVersionKind is required to set up this owner reference.
kind := reflect.TypeOf(cronjobv1.CronJob{}).Name()
gvk := cronjobv1.GroupVersion.WithKind(kind)
controllerRef := metav1.NewControllerRef(createdCronjob, gvk)
testJob.SetOwnerReferences([]metav1.OwnerReference{*controllerRef})
Expect(k8sClient.Create(ctx, testJob)).To(Succeed())
// Note that you can not manage the status values while creating the resource.
// The status field is managed separately to reflect the current state of the resource.
// Therefore, it should be updated using a PATCH or PUT operation after the resource has been created.
// Additionally, it is recommended to use StatusConditions to manage the status. For further information see:
// https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#spec-and-status
testJob.Status.Active = 2
Expect(k8sClient.Status().Update(ctx, testJob)).To(Succeed())
Adding this Job to our test CronJob should trigger our controller’s reconciler logic. After that, we can write a test that evaluates whether our controller eventually updates our CronJob’s Status field as expected!
By("By checking that the CronJob has one active Job")
Eventually(func(g Gomega) {
g.Expect(k8sClient.Get(ctx, cronjobLookupKey, createdCronjob)).To(Succeed(), "should GET the CronJob")
g.Expect(createdCronjob.Status.Active).To(HaveLen(1), "should have exactly one active job")
g.Expect(createdCronjob.Status.Active[0].Name).To(Equal(JobName), "the wrong job is active")
}, timeout, interval).Should(Succeed(), "should list our active job %s in the active jobs list in status", JobName)
})
})
})
After writing all this code, you can run go test ./...
in your controllers/
directory again to run your new test!
以上的状态更新示例展示了一种针对自定义 Kind 及其下游对象的一般测试策略。到目前为止,希望您已经学习了以下测试控制器行为的方法:
- 在 envtest 集群上设置您的控制器
- 为创建测试对象编写存根
- 将变化孤立到一个对象,以测试特定控制器的行为