diff --git a/.gomodcheck.yaml b/.gomodcheck.yaml index 75c5261fde..fe69440d42 100644 --- a/.gomodcheck.yaml +++ b/.gomodcheck.yaml @@ -1,10 +1,10 @@ upstreamRefs: - - k8s.io/api - - k8s.io/apiextensions-apiserver - - k8s.io/apimachinery - - k8s.io/apiserver - - k8s.io/client-go - - k8s.io/component-base + #- k8s.io/api + #- k8s.io/apiextensions-apiserver + #- k8s.io/apimachinery + #- k8s.io/apiserver + #- k8s.io/client-go + #- k8s.io/component-base # k8s.io/klog/v2 -> conflicts with k/k deps # k8s.io/utils -> conflicts with k/k deps diff --git a/hack/check-everything.sh b/hack/check-everything.sh index b05d4059af..49bae41d31 100755 --- a/hack/check-everything.sh +++ b/hack/check-everything.sh @@ -41,6 +41,62 @@ tmp_bin=/tmp/cr-tests-bin ) export KUBEBUILDER_ASSETS="$(${tmp_bin}/setup-envtest use --use-env -p path "${ENVTEST_K8S_VERSION}")" +# HACK +k8s_clone_dir=$tmp_root/kubernetes +( + k8s_repo_url=https://github.com/kubernetes/kubernetes.git + + echo "Cloning Kube repository from $k8s_repo_url..." + git clone $k8s_repo_url $k8s_clone_dir + + cd $k8s_clone_dir + + pr_number="128078" + echo "Fetching pull request #$pr_number..." + git fetch origin pull/$pr_number/head:pr-$pr_number + + echo "Checking out pull request #$pr_number..." + git checkout pr-$pr_number + + echo "Building Kube from source code..." + make +) +k8s_bin_dir=$( + k8s_output_dir=${k8s_clone_dir}/_output/local/go/bin + if [ -d "${k8s_output_dir}" ]; then + cd ${k8s_output_dir} + pwd + else + echo "Directory ${k8s_output_dir} does not exist." + exit 1 + fi +) +echo "Replacing kube-apiserver binary from ${k8s_bin_dir} to ${KUBEBUILDER_ASSETS}" +cp -f "${k8s_bin_dir}/kube-apiserver" "${KUBEBUILDER_ASSETS}/kube-apiserver" + +etcd_download_dir=${tmp_root}/etcd +( + etcd_version="v3.5.15" + etcd_arch="linux-amd64" + + etcd_download_url="https://github.com/etcd-io/etcd/releases/download/${etcd_version}/etcd-${etcd_version}-${etcd_arch}.tar.gz" + + echo "Downloading etcd ${etcd_version} for ${etcd_arch}..." + curl -fL ${etcd_download_url} -o etcd-${etcd_version}-${etcd_arch}.tar.gz + + echo "Extracting etcd to ${etcd_download_dir}..." + mkdir -p ${etcd_download_dir} + tar xzvf etcd-${etcd_version}-${etcd_arch}.tar.gz -C ${etcd_download_dir} --strip-components=1 + + echo "etcd ${etcd_version} for ${etcd_arch} is downloaded and extracted to ${etcd_download_dir}." +) +echo "Replacing etcd binary from ${etcd_download_dir} to ${KUBEBUILDER_ASSETS}" +cp -f "${etcd_download_dir}/etcd" "${KUBEBUILDER_ASSETS}/etcd" + +echo "Enabling WatchListClient feature" +export KUBE_FEATURE_WatchListClient=true +# END OF HACK + # Run tests. ${hack_dir}/test-all.sh diff --git a/pkg/builder/controller_test.go b/pkg/builder/controller_test.go index cbeb1c43bc..b011004cdb 100644 --- a/pkg/builder/controller_test.go +++ b/pkg/builder/controller_test.go @@ -201,9 +201,9 @@ var _ = Describe("application", func() { Named("my_controller-0"). Owns(&appsv1.ReplicaSet{}). Build(typedNoop) - // If we ever allow Owns() without For() we need to update the code to error - // out on Owns() if the request type is different from reconcile.Request just - // like we do in For(). + // If we ever allow Owns() without For() we need to update the code to error + // out on Owns() if the request type is different from reconcile.Request just + // like we do in For(). Expect(err).To(MatchError("Owns() can only be used together with For()")) Expect(instance).To(BeNil()) }) @@ -711,7 +711,7 @@ func doReconcileTest(ctx context.Context, nameSuffix string, mgr manager.Manager Expect(err).NotTo(HaveOccurred()) By("Waiting for the Deployment Reconcile") - Eventually(ch).Should(Receive(Equal(reconcile.Request{ + Eventually(ch, "5s").Should(Receive(Equal(reconcile.Request{ NamespacedName: types.NamespacedName{Namespace: "default", Name: deployName}}))) By("Creating a ReplicaSet") diff --git a/pkg/client/watch_list_test.go b/pkg/client/watch_list_test.go new file mode 100644 index 0000000000..520d51a4ff --- /dev/null +++ b/pkg/client/watch_list_test.go @@ -0,0 +1,81 @@ +package client_test + +import ( + "context" + "fmt" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "sigs.k8s.io/controller-runtime/pkg/envtest" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" + clientfeatures "k8s.io/client-go/features" + "k8s.io/client-go/kubernetes" + "k8s.io/utils/ptr" +) + +var _ = Describe("WatchList", func() { + It("should work against the kube-apiserver", func() { + + Expect(clientfeatures.FeatureGates().Enabled(clientfeatures.WatchListClient)).To(BeTrue()) + + testenv = &envtest.Environment{} + cfg, err := testenv.Start() + Expect(err).NotTo(HaveOccurred()) + clientset, err = kubernetes.NewForConfig(cfg) + Expect(err).NotTo(HaveOccurred()) + + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + opts := metav1.ListOptions{} + opts.AllowWatchBookmarks = true + opts.SendInitialEvents = ptr.To(true) + opts.ResourceVersionMatch = metav1.ResourceVersionMatchNotOlderThan + w, err := clientset.CoreV1().Secrets("kube-system").Watch(ctx, opts) + Expect(err).NotTo(HaveOccurred()) + defer w.Stop() + + receivedWatchListStreamFromTheServer := false + func() { + for { + select { + case <-ctx.Done(): + return + case event, ok := <-w.ResultChan(): + if !ok { + panic("unexpected watch close") + + } + if event.Type == watch.Error { + panic(fmt.Sprintf("unexpected watch event: %v", apierrors.FromObject(event.Object))) + } + meta, err := meta.Accessor(event.Object) + Expect(err).NotTo(HaveOccurred()) + + switch event.Type { + case watch.Bookmark: + if meta.GetAnnotations()[metav1.InitialEventsAnnotationKey] != "true" { + continue + } + if len(meta.GetAnnotations()["kubernetes.io/initial-events-list-blueprint"]) == 0 { + continue + } + receivedWatchListStreamFromTheServer = true + return + case watch.Added, watch.Modified, watch.Deleted, watch.Error: + default: + panic(fmt.Sprintf("unexpected watch event: %v", event.Object)) + } + } + } + }() + + Expect(receivedWatchListStreamFromTheServer).To(BeTrue()) + }) +}) diff --git a/pkg/source/source_integration_test.go b/pkg/source/source_integration_test.go index 504a671c8a..4d5bd7423c 100644 --- a/pkg/source/source_integration_test.go +++ b/pkg/source/source_integration_test.go @@ -210,7 +210,7 @@ var _ = Describe("Source", func() { informerFactory = kubeinformers.NewSharedInformerFactory(clientset, time.Second*30) depInformer = informerFactory.Apps().V1().ReplicaSets().Informer() informerFactory.Start(stopTest) - Eventually(depInformer.HasSynced).Should(BeTrue()) + Eventually(depInformer.HasSynced, "5s").Should(BeTrue()) c = make(chan struct{}) rs = &appsv1.ReplicaSet{