From 2b6c16cf2463718e0362fb282ede4484019c4caf Mon Sep 17 00:00:00 2001 From: Jussi Maki Date: Wed, 10 Jul 2024 19:02:56 +0200 Subject: [PATCH] experimental: Add K8s integration tests These tests are similar in style to test/controlplane and validate that from a given set of K8s objects expressed in YAML the expected BPF map contents can be derived through reconciliation. Signed-off-by: Jussi Maki --- pkg/loadbalancer/experimental/k8s_test.go | 292 ++++++++++++++++++ .../experimental/testdata/.gitignore | 1 + .../testdata/hostport-simple/expected.maps | 7 + .../testdata/hostport-simple/nginx.yaml | 36 +++ .../testdata/hostport-simple/pod.yaml | 52 ++++ .../testdata/nodeport-simple/endpoints.yaml | 21 ++ .../testdata/nodeport-simple/expected.maps | 10 + .../testdata/nodeport-simple/service.yaml | 29 ++ 8 files changed, 448 insertions(+) create mode 100644 pkg/loadbalancer/experimental/k8s_test.go create mode 100644 pkg/loadbalancer/experimental/testdata/.gitignore create mode 100644 pkg/loadbalancer/experimental/testdata/hostport-simple/expected.maps create mode 100644 pkg/loadbalancer/experimental/testdata/hostport-simple/nginx.yaml create mode 100644 pkg/loadbalancer/experimental/testdata/hostport-simple/pod.yaml create mode 100644 pkg/loadbalancer/experimental/testdata/nodeport-simple/endpoints.yaml create mode 100644 pkg/loadbalancer/experimental/testdata/nodeport-simple/expected.maps create mode 100644 pkg/loadbalancer/experimental/testdata/nodeport-simple/service.yaml diff --git a/pkg/loadbalancer/experimental/k8s_test.go b/pkg/loadbalancer/experimental/k8s_test.go new file mode 100644 index 0000000000000..82ccce703c82d --- /dev/null +++ b/pkg/loadbalancer/experimental/k8s_test.go @@ -0,0 +1,292 @@ +package experimental + +import ( + "context" + "os" + "path" + "slices" + "strings" + "testing" + "time" + + "github.com/cilium/hive/cell" + "github.com/cilium/hive/hivetest" + "github.com/cilium/statedb" + "github.com/cilium/statedb/reconciler" + "github.com/cilium/stream" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sRuntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" + + "github.com/cilium/cilium/pkg/datapath/tables" + "github.com/cilium/cilium/pkg/hive" + "github.com/cilium/cilium/pkg/k8s" + "github.com/cilium/cilium/pkg/k8s/resource" + slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1" + slim_fake "github.com/cilium/cilium/pkg/k8s/slim/k8s/client/clientset/versioned/fake" + "github.com/cilium/cilium/pkg/maps/lbmap" + "github.com/cilium/cilium/pkg/metrics" + "github.com/cilium/cilium/pkg/option" + "github.com/cilium/cilium/pkg/testutils" +) + +var ( + slimDecoder k8sRuntime.Decoder +) + +func init() { + slimScheme := k8sRuntime.NewScheme() + slim_fake.AddToScheme(slimScheme) + slimScheme.AddKnownTypes(slim_corev1.SchemeGroupVersion, &metav1.List{}) + slimDecoder = serializer.NewCodecFactory(slimScheme).UniversalDeserializer() +} + +func decodeObject[Obj k8sRuntime.Object](t *testing.T, file string) Obj { + bytes, err := os.ReadFile(file) + require.NoError(t, err, "ReadFile(%s)", file) + obj, _, err := slimDecoder.Decode(bytes, nil, nil) + require.NoError(t, err, "Decode(%s)", file) + return obj.(Obj) +} + +// TODO: EndpointSlices instead +func decodeEndpoints(t *testing.T, file string) *k8s.Endpoints { + eps := decodeObject[*slim_corev1.Endpoints](t, file) + return k8s.ParseEndpoints(eps) +} + +func readObjects[Obj k8sRuntime.Object](t *testing.T, dataDir string, prefix string) (out []Obj) { + ents, err := os.ReadDir(dataDir) + require.NoError(t, err, "ReadDir(%s)", dataDir) + + for _, ent := range ents { + if strings.HasPrefix(ent.Name(), prefix) && strings.HasSuffix(ent.Name(), ".yaml") { + out = append(out, decodeObject[Obj](t, path.Join(dataDir, ent.Name()))) + } + } + return +} + +func upsertEvent[Obj k8sRuntime.Object](obj Obj) resource.Event[Obj] { + return resource.Event[Obj]{ + Object: obj, + Kind: resource.Upsert, + Done: func(error) {}, + } +} + +func TestIntegrationK8s(t *testing.T) { + testutils.PrivilegedTest(t) + + // TODO: clean up once there's a LBMap cell of sorts. Need + // support for unpinned maps though...? + lbmap.Init(lbmap.InitParams{ + IPv4: true, + IPv6: true, + MaxSockRevNatMapEntries: 1000, + ServiceMapMaxEntries: 1000, + BackEndMapMaxEntries: 1000, + RevNatMapMaxEntries: 1000, + AffinityMapMaxEntries: 1000, + SourceRangeMapMaxEntries: 1000, + MaglevMapMaxEntries: 1000, + }) + + log := hivetest.Logger(t) + + dirs, err := os.ReadDir("testdata") + require.NoError(t, err, "ReadDir(testdata)") + + require.NoError(t, lbmap.Service4MapV2.CreateUnpinned()) + require.NoError(t, lbmap.Service6MapV2.CreateUnpinned()) + require.NoError(t, lbmap.Backend4MapV3.CreateUnpinned()) + require.NoError(t, lbmap.Backend6MapV3.CreateUnpinned()) + require.NoError(t, lbmap.RevNat4Map.CreateUnpinned()) + require.NoError(t, lbmap.RevNat6Map.CreateUnpinned()) + require.NoError(t, lbmap.AffinityMatchMap.CreateUnpinned()) + + for _, ent := range dirs { + if !ent.IsDir() { + continue + } + + testDataPath := path.Join("testdata", ent.Name()) + + services := + stream.Concat( + stream.Just( + resource.Event[*slim_corev1.Service]{ + Kind: resource.Sync, + Done: func(error) {}, + }), + stream.Map( + stream.FromSlice(readObjects[*slim_corev1.Service](t, testDataPath, "service")), + upsertEvent)) + + pods := + stream.Concat( + stream.Just( + resource.Event[*slim_corev1.Pod]{ + Kind: resource.Sync, + Done: func(error) {}, + }), + stream.Map( + stream.FromSlice(readObjects[*slim_corev1.Pod](t, testDataPath, "pod")), + upsertEvent)) + + endpoints := + stream.Concat( + stream.Just( + resource.Event[*k8s.Endpoints]{ + Kind: resource.Sync, + Done: func(error) {}, + }), + stream.Map( + stream.Map( + stream.FromSlice(readObjects[*slim_corev1.Endpoints](t, testDataPath, "endpoints")), + k8s.ParseEndpoints, + ), + upsertEvent)) + + var ( + writer *Writer + db *statedb.DB + ) + + // TODO: Rework the test to use one hive instance. Service/Backend ID allocation needs to be reset though? + h := hive.New( + // FIXME. Need this to avoid 1 second delay on metric operations. + // Figure out a better way to deal with this. + metrics.Cell, + cell.Provide(func() *option.DaemonConfig { + return &option.DaemonConfig{} + }), + + cell.Module( + "loadbalancer-test", + "Test module", + + cell.Config(DefaultConfig), + + cell.Provide(func() streamsOut { + return streamsOut{ + ServicesStream: services, + EndpointsStream: endpoints, + PodsStream: pods, + } + }), + + cell.Invoke(func(db_ *statedb.DB, w *Writer) { + db = db_ + writer = w + }), + + // Provides [Writer] API and the load-balancing tables. + TablesCell, + + // Reflects Kubernetes services and endpoints to the load-balancing tables + // using the [Writer]. + ReflectorCell, + + // Reconcile tables to BPF maps + BPFReconcilerCell, + + cell.Provide( + tables.NewNodeAddressTable, + statedb.RWTable[tables.NodeAddress].ToTable, + ), + cell.Invoke(func(db *statedb.DB, nodeAddrs statedb.RWTable[tables.NodeAddress]) { + db.RegisterTable(nodeAddrs) + txn := db.WriteTxn(nodeAddrs) + nodeAddrs.Insert( + txn, + tables.NodeAddress{ + Addr: nodePortAddrs[0], + NodePort: true, + Primary: true, + DeviceName: "eth0", + }, + ) + nodeAddrs.Insert( + txn, + tables.NodeAddress{ + Addr: nodePortAddrs[1], + NodePort: true, + Primary: true, + DeviceName: "eth0", + }, + ) + txn.Commit() + + }), + ), + ) + + hive.AddConfigOverride(h, func(cfg *Config) { + cfg.EnableExperimentalLB = true + }) + + require.NoError(t, h.Start(log, context.TODO())) + + // Wait for reconciliation. + timeout := time.After(5 * time.Second) + for { + iter, watch := writer.Frontends().AllWatch(db.ReadTxn()) + allDone := true + count := 0 + for obj, _, ok := iter.Next(); ok; obj, _, ok = iter.Next() { + if obj.Status.Kind != reconciler.StatusKindDone { + allDone = false + } + count++ + } + if count > 0 && allDone { + break + } + select { + case <-timeout: + writer.DebugDump(db.ReadTxn(), os.Stdout) + t.Fatalf("TIMEOUT") + case <-watch: + } + } + + expectedData, err := os.ReadFile(path.Join(testDataPath, "expected.maps")) + if err != nil { + panic(err) + } + expected := strings.Split(strings.TrimSpace(string(expectedData)), "\n") + actual := dump(frontendAddrs[0]) + + if !slices.Equal(expected, actual) { + // TODO: Nicer diff + t.Errorf("Mismatching BPF map contents:\nexpected:\n%s\nactual:\n%s", + strings.Join(expected, "\n"), + strings.Join(actual, "\n"), + ) + actualPath := path.Join(testDataPath, "actual.maps") + os.WriteFile( + actualPath, + []byte(strings.Join(actual, "\n")+"\n"), + 0644, + ) + t.Logf("Wrote actual BPF map dump to %s", actualPath) + + writer.DebugDump(db.ReadTxn(), os.Stdout) // TODO: also dump this to files? + } + + h.Stop(log, context.TODO()) + + // FIXME: Clean up by deleting the k8s objects instead! + + require.NoError(t, lbmap.Service4MapV2.DeleteAll()) + require.NoError(t, lbmap.Service6MapV2.DeleteAll()) + require.NoError(t, lbmap.Backend4MapV3.DeleteAll()) + require.NoError(t, lbmap.Backend6MapV3.DeleteAll()) + require.NoError(t, lbmap.RevNat4Map.DeleteAll()) + require.NoError(t, lbmap.RevNat6Map.DeleteAll()) + require.NoError(t, lbmap.AffinityMatchMap.DeleteAll()) + + } +} diff --git a/pkg/loadbalancer/experimental/testdata/.gitignore b/pkg/loadbalancer/experimental/testdata/.gitignore new file mode 100644 index 0000000000000..3b6ae9469fbf1 --- /dev/null +++ b/pkg/loadbalancer/experimental/testdata/.gitignore @@ -0,0 +1 @@ +*/actual.maps diff --git a/pkg/loadbalancer/experimental/testdata/hostport-simple/expected.maps b/pkg/loadbalancer/experimental/testdata/hostport-simple/expected.maps new file mode 100644 index 0000000000000..047897f6dcbee --- /dev/null +++ b/pkg/loadbalancer/experimental/testdata/hostport-simple/expected.maps @@ -0,0 +1,7 @@ +BE: ID=1 ADDR=10.244.1.113:80 STATE=active +REV: ID=1 ADDR= +REV: ID=2 ADDR= +SVC: ID=1 ADDR= SLOT=0 BEID=0 COUNT=1 FLAGS=HostPort+non-routable +SVC: ID=1 ADDR= SLOT=1 BEID=1 COUNT=0 FLAGS=HostPort+non-routable +SVC: ID=2 ADDR= SLOT=0 BEID=0 COUNT=1 FLAGS=HostPort +SVC: ID=2 ADDR= SLOT=1 BEID=1 COUNT=0 FLAGS=HostPort diff --git a/pkg/loadbalancer/experimental/testdata/hostport-simple/nginx.yaml b/pkg/loadbalancer/experimental/testdata/hostport-simple/nginx.yaml new file mode 100644 index 0000000000000..724407fbdc31c --- /dev/null +++ b/pkg/loadbalancer/experimental/testdata/hostport-simple/nginx.yaml @@ -0,0 +1,36 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: my-app +spec: + selector: + matchLabels: + run: my-app + replicas: 1 + template: + metadata: + labels: + run: my-app + spec: + containers: + - name: my-app + image: nginx + ports: + - containerPort: 80 + hostPort: 4444 + +--- + +apiVersion: v1 +kind: Service +metadata: + name: my-app + labels: + run: my-app +spec: + type: NodePort + ports: + - port: 80 + protocol: TCP + selector: + run: my-app diff --git a/pkg/loadbalancer/experimental/testdata/hostport-simple/pod.yaml b/pkg/loadbalancer/experimental/testdata/hostport-simple/pod.yaml new file mode 100644 index 0000000000000..c5165e2fb202b --- /dev/null +++ b/pkg/loadbalancer/experimental/testdata/hostport-simple/pod.yaml @@ -0,0 +1,52 @@ +apiVersion: v1 +kind: Pod +metadata: + creationTimestamp: "2024-07-10T16:20:42Z" + generateName: my-app-85f46c4bd9- + labels: + pod-template-hash: 85f46c4bd9 + run: my-app + name: my-app-85f46c4bd9-nnk25 + namespace: default + ownerReferences: + - apiVersion: apps/v1 + blockOwnerDeletion: true + controller: true + kind: ReplicaSet + name: my-app-85f46c4bd9 + uid: 5117de71-d622-44ec-b377-96e3a95e6446 + resourceVersion: "100491" + uid: 1e75ff92-2e9b-4c61-8454-ae81344876d8 +spec: + containers: + - image: nginx + imagePullPolicy: Always + name: my-app + ports: + - containerPort: 80 + hostPort: 4444 + protocol: TCP + resources: {} + terminationMessagePath: /dev/termination-log + terminationMessagePolicy: File + dnsPolicy: ClusterFirst + enableServiceLinks: true + nodeName: kind-worker + preemptionPolicy: PreemptLowerPriority + priority: 0 + restartPolicy: Always + schedulerName: default-scheduler + securityContext: {} + serviceAccount: default + serviceAccountName: default + terminationGracePeriodSeconds: 30 +status: + hostIP: 172.19.0.3 + hostIPs: + - ip: 172.19.0.3 + phase: Running + podIP: 10.244.1.113 + podIPs: + - ip: 10.244.1.113 + qosClass: BestEffort + startTime: "2024-07-10T16:20:42Z" diff --git a/pkg/loadbalancer/experimental/testdata/nodeport-simple/endpoints.yaml b/pkg/loadbalancer/experimental/testdata/nodeport-simple/endpoints.yaml new file mode 100644 index 0000000000000..d5860b552de3f --- /dev/null +++ b/pkg/loadbalancer/experimental/testdata/nodeport-simple/endpoints.yaml @@ -0,0 +1,21 @@ +apiVersion: v1 +kind: Endpoints +metadata: + creationTimestamp: "2022-09-13T11:11:26Z" + name: echo + namespace: test + resourceVersion: "795" + uid: 034599d6-0f0a-435e-b8a0-1018520ee741 +subsets: +- addresses: + - ip: 10.244.1.224 + nodeName: nodeport-worker + targetRef: + kind: Pod + name: echo-757d4cb97f-9gmf7 + namespace: test + uid: 88542b9d-6369-4ec3-a5eb-fd53720013e8 + ports: + - name: http + port: 80 + protocol: TCP diff --git a/pkg/loadbalancer/experimental/testdata/nodeport-simple/expected.maps b/pkg/loadbalancer/experimental/testdata/nodeport-simple/expected.maps new file mode 100644 index 0000000000000..47fcf54bd5597 --- /dev/null +++ b/pkg/loadbalancer/experimental/testdata/nodeport-simple/expected.maps @@ -0,0 +1,10 @@ +BE: ID=1 ADDR=10.244.1.224:80 STATE=active +REV: ID=1 ADDR= +REV: ID=2 ADDR= +REV: ID=3 ADDR=10.96.50.104:80 +SVC: ID=1 ADDR= SLOT=0 BEID=0 COUNT=1 FLAGS=NodePort+non-routable +SVC: ID=1 ADDR= SLOT=1 BEID=1 COUNT=0 FLAGS=NodePort+non-routable +SVC: ID=2 ADDR= SLOT=0 BEID=0 COUNT=1 FLAGS=NodePort +SVC: ID=2 ADDR= SLOT=1 BEID=1 COUNT=0 FLAGS=NodePort +SVC: ID=3 ADDR=10.96.50.104:80 SLOT=0 BEID=0 COUNT=1 FLAGS=ClusterIP+non-routable +SVC: ID=3 ADDR=10.96.50.104:80 SLOT=1 BEID=1 COUNT=0 FLAGS=ClusterIP+non-routable diff --git a/pkg/loadbalancer/experimental/testdata/nodeport-simple/service.yaml b/pkg/loadbalancer/experimental/testdata/nodeport-simple/service.yaml new file mode 100644 index 0000000000000..43dd74f71c4c5 --- /dev/null +++ b/pkg/loadbalancer/experimental/testdata/nodeport-simple/service.yaml @@ -0,0 +1,29 @@ +apiVersion: v1 +kind: Service +metadata: + creationTimestamp: "2022-09-13T11:11:26Z" + name: echo + namespace: test + resourceVersion: "741" + uid: a49fe99c-3564-4754-acc4-780f2331a49b +spec: + clusterIP: 10.96.50.104 + clusterIPs: + - 10.96.50.104 + externalTrafficPolicy: Cluster + internalTrafficPolicy: Cluster + ipFamilies: + - IPv4 + ipFamilyPolicy: SingleStack + ports: + - name: http + nodePort: 30781 + port: 80 + protocol: TCP + targetPort: 80 + selector: + name: echo + sessionAffinity: None + type: NodePort +status: + loadBalancer: {}