Skip to content

Commit

Permalink
experimental: Add K8s integration tests
Browse files Browse the repository at this point in the history
These tests are similar in style to test/controlplane and validate that
from a given set of K8s objects expressed in YAML the expected BPF map
contents can be derived through reconciliation.

Signed-off-by: Jussi Maki <jussi@isovalent.com>
  • Loading branch information
joamaki committed Jul 10, 2024
1 parent 6c7a839 commit 2b6c16c
Show file tree
Hide file tree
Showing 8 changed files with 448 additions and 0 deletions.
292 changes: 292 additions & 0 deletions pkg/loadbalancer/experimental/k8s_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,292 @@
package experimental

import (
"context"
"os"
"path"
"slices"
"strings"
"testing"
"time"

"github.com/cilium/hive/cell"
"github.com/cilium/hive/hivetest"
"github.com/cilium/statedb"
"github.com/cilium/statedb/reconciler"
"github.com/cilium/stream"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sRuntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"

"github.com/cilium/cilium/pkg/datapath/tables"
"github.com/cilium/cilium/pkg/hive"
"github.com/cilium/cilium/pkg/k8s"
"github.com/cilium/cilium/pkg/k8s/resource"
slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1"
slim_fake "github.com/cilium/cilium/pkg/k8s/slim/k8s/client/clientset/versioned/fake"
"github.com/cilium/cilium/pkg/maps/lbmap"
"github.com/cilium/cilium/pkg/metrics"
"github.com/cilium/cilium/pkg/option"
"github.com/cilium/cilium/pkg/testutils"
)

var (
slimDecoder k8sRuntime.Decoder
)

func init() {
slimScheme := k8sRuntime.NewScheme()
slim_fake.AddToScheme(slimScheme)
slimScheme.AddKnownTypes(slim_corev1.SchemeGroupVersion, &metav1.List{})
slimDecoder = serializer.NewCodecFactory(slimScheme).UniversalDeserializer()
}

func decodeObject[Obj k8sRuntime.Object](t *testing.T, file string) Obj {
bytes, err := os.ReadFile(file)
require.NoError(t, err, "ReadFile(%s)", file)
obj, _, err := slimDecoder.Decode(bytes, nil, nil)
require.NoError(t, err, "Decode(%s)", file)
return obj.(Obj)
}

// TODO: EndpointSlices instead
func decodeEndpoints(t *testing.T, file string) *k8s.Endpoints {
eps := decodeObject[*slim_corev1.Endpoints](t, file)
return k8s.ParseEndpoints(eps)
}

func readObjects[Obj k8sRuntime.Object](t *testing.T, dataDir string, prefix string) (out []Obj) {
ents, err := os.ReadDir(dataDir)
require.NoError(t, err, "ReadDir(%s)", dataDir)

for _, ent := range ents {
if strings.HasPrefix(ent.Name(), prefix) && strings.HasSuffix(ent.Name(), ".yaml") {
out = append(out, decodeObject[Obj](t, path.Join(dataDir, ent.Name())))
}
}
return
}

func upsertEvent[Obj k8sRuntime.Object](obj Obj) resource.Event[Obj] {
return resource.Event[Obj]{
Object: obj,
Kind: resource.Upsert,
Done: func(error) {},
}
}

func TestIntegrationK8s(t *testing.T) {
testutils.PrivilegedTest(t)

// TODO: clean up once there's a LBMap cell of sorts. Need
// support for unpinned maps though...?
lbmap.Init(lbmap.InitParams{
IPv4: true,
IPv6: true,
MaxSockRevNatMapEntries: 1000,
ServiceMapMaxEntries: 1000,
BackEndMapMaxEntries: 1000,
RevNatMapMaxEntries: 1000,
AffinityMapMaxEntries: 1000,
SourceRangeMapMaxEntries: 1000,
MaglevMapMaxEntries: 1000,
})

log := hivetest.Logger(t)

dirs, err := os.ReadDir("testdata")
require.NoError(t, err, "ReadDir(testdata)")

require.NoError(t, lbmap.Service4MapV2.CreateUnpinned())
require.NoError(t, lbmap.Service6MapV2.CreateUnpinned())
require.NoError(t, lbmap.Backend4MapV3.CreateUnpinned())
require.NoError(t, lbmap.Backend6MapV3.CreateUnpinned())
require.NoError(t, lbmap.RevNat4Map.CreateUnpinned())
require.NoError(t, lbmap.RevNat6Map.CreateUnpinned())
require.NoError(t, lbmap.AffinityMatchMap.CreateUnpinned())

for _, ent := range dirs {
if !ent.IsDir() {
continue
}

testDataPath := path.Join("testdata", ent.Name())

services :=
stream.Concat(
stream.Just(
resource.Event[*slim_corev1.Service]{
Kind: resource.Sync,
Done: func(error) {},
}),
stream.Map(
stream.FromSlice(readObjects[*slim_corev1.Service](t, testDataPath, "service")),
upsertEvent))

pods :=
stream.Concat(
stream.Just(
resource.Event[*slim_corev1.Pod]{
Kind: resource.Sync,
Done: func(error) {},
}),
stream.Map(
stream.FromSlice(readObjects[*slim_corev1.Pod](t, testDataPath, "pod")),
upsertEvent))

endpoints :=
stream.Concat(
stream.Just(
resource.Event[*k8s.Endpoints]{
Kind: resource.Sync,
Done: func(error) {},
}),
stream.Map(
stream.Map(
stream.FromSlice(readObjects[*slim_corev1.Endpoints](t, testDataPath, "endpoints")),
k8s.ParseEndpoints,
),
upsertEvent))

var (
writer *Writer
db *statedb.DB
)

// TODO: Rework the test to use one hive instance. Service/Backend ID allocation needs to be reset though?
h := hive.New(
// FIXME. Need this to avoid 1 second delay on metric operations.
// Figure out a better way to deal with this.
metrics.Cell,
cell.Provide(func() *option.DaemonConfig {
return &option.DaemonConfig{}
}),

cell.Module(
"loadbalancer-test",
"Test module",

cell.Config(DefaultConfig),

cell.Provide(func() streamsOut {
return streamsOut{
ServicesStream: services,
EndpointsStream: endpoints,
PodsStream: pods,
}
}),

cell.Invoke(func(db_ *statedb.DB, w *Writer) {
db = db_
writer = w
}),

// Provides [Writer] API and the load-balancing tables.
TablesCell,

// Reflects Kubernetes services and endpoints to the load-balancing tables
// using the [Writer].
ReflectorCell,

// Reconcile tables to BPF maps
BPFReconcilerCell,

cell.Provide(
tables.NewNodeAddressTable,
statedb.RWTable[tables.NodeAddress].ToTable,
),
cell.Invoke(func(db *statedb.DB, nodeAddrs statedb.RWTable[tables.NodeAddress]) {
db.RegisterTable(nodeAddrs)
txn := db.WriteTxn(nodeAddrs)
nodeAddrs.Insert(
txn,
tables.NodeAddress{
Addr: nodePortAddrs[0],
NodePort: true,
Primary: true,
DeviceName: "eth0",
},
)
nodeAddrs.Insert(
txn,
tables.NodeAddress{
Addr: nodePortAddrs[1],
NodePort: true,
Primary: true,
DeviceName: "eth0",
},
)
txn.Commit()

}),
),
)

hive.AddConfigOverride(h, func(cfg *Config) {
cfg.EnableExperimentalLB = true
})

require.NoError(t, h.Start(log, context.TODO()))

// Wait for reconciliation.
timeout := time.After(5 * time.Second)
for {
iter, watch := writer.Frontends().AllWatch(db.ReadTxn())
allDone := true
count := 0
for obj, _, ok := iter.Next(); ok; obj, _, ok = iter.Next() {
if obj.Status.Kind != reconciler.StatusKindDone {
allDone = false
}
count++
}
if count > 0 && allDone {
break
}
select {
case <-timeout:
writer.DebugDump(db.ReadTxn(), os.Stdout)
t.Fatalf("TIMEOUT")
case <-watch:
}
}

expectedData, err := os.ReadFile(path.Join(testDataPath, "expected.maps"))
if err != nil {
panic(err)
}
expected := strings.Split(strings.TrimSpace(string(expectedData)), "\n")
actual := dump(frontendAddrs[0])

if !slices.Equal(expected, actual) {
// TODO: Nicer diff
t.Errorf("Mismatching BPF map contents:\nexpected:\n%s\nactual:\n%s",
strings.Join(expected, "\n"),
strings.Join(actual, "\n"),
)
actualPath := path.Join(testDataPath, "actual.maps")
os.WriteFile(
actualPath,
[]byte(strings.Join(actual, "\n")+"\n"),
0644,
)
t.Logf("Wrote actual BPF map dump to %s", actualPath)

writer.DebugDump(db.ReadTxn(), os.Stdout) // TODO: also dump this to files?
}

h.Stop(log, context.TODO())

// FIXME: Clean up by deleting the k8s objects instead!

require.NoError(t, lbmap.Service4MapV2.DeleteAll())
require.NoError(t, lbmap.Service6MapV2.DeleteAll())
require.NoError(t, lbmap.Backend4MapV3.DeleteAll())
require.NoError(t, lbmap.Backend6MapV3.DeleteAll())
require.NoError(t, lbmap.RevNat4Map.DeleteAll())
require.NoError(t, lbmap.RevNat6Map.DeleteAll())
require.NoError(t, lbmap.AffinityMatchMap.DeleteAll())

}
}
1 change: 1 addition & 0 deletions pkg/loadbalancer/experimental/testdata/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*/actual.maps
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
BE: ID=1 ADDR=10.244.1.113:80 STATE=active
REV: ID=1 ADDR=<zero>
REV: ID=2 ADDR=<nodePort>
SVC: ID=1 ADDR=<zero> SLOT=0 BEID=0 COUNT=1 FLAGS=HostPort+non-routable
SVC: ID=1 ADDR=<zero> SLOT=1 BEID=1 COUNT=0 FLAGS=HostPort+non-routable
SVC: ID=2 ADDR=<nodePort> SLOT=0 BEID=0 COUNT=1 FLAGS=HostPort
SVC: ID=2 ADDR=<nodePort> SLOT=1 BEID=1 COUNT=0 FLAGS=HostPort
36 changes: 36 additions & 0 deletions pkg/loadbalancer/experimental/testdata/hostport-simple/nginx.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-app
spec:
selector:
matchLabels:
run: my-app
replicas: 1
template:
metadata:
labels:
run: my-app
spec:
containers:
- name: my-app
image: nginx
ports:
- containerPort: 80
hostPort: 4444

---

apiVersion: v1
kind: Service
metadata:
name: my-app
labels:
run: my-app
spec:
type: NodePort
ports:
- port: 80
protocol: TCP
selector:
run: my-app
52 changes: 52 additions & 0 deletions pkg/loadbalancer/experimental/testdata/hostport-simple/pod.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
apiVersion: v1
kind: Pod
metadata:
creationTimestamp: "2024-07-10T16:20:42Z"
generateName: my-app-85f46c4bd9-
labels:
pod-template-hash: 85f46c4bd9
run: my-app
name: my-app-85f46c4bd9-nnk25
namespace: default
ownerReferences:
- apiVersion: apps/v1
blockOwnerDeletion: true
controller: true
kind: ReplicaSet
name: my-app-85f46c4bd9
uid: 5117de71-d622-44ec-b377-96e3a95e6446
resourceVersion: "100491"
uid: 1e75ff92-2e9b-4c61-8454-ae81344876d8
spec:
containers:
- image: nginx
imagePullPolicy: Always
name: my-app
ports:
- containerPort: 80
hostPort: 4444
protocol: TCP
resources: {}
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
dnsPolicy: ClusterFirst
enableServiceLinks: true
nodeName: kind-worker
preemptionPolicy: PreemptLowerPriority
priority: 0
restartPolicy: Always
schedulerName: default-scheduler
securityContext: {}
serviceAccount: default
serviceAccountName: default
terminationGracePeriodSeconds: 30
status:
hostIP: 172.19.0.3
hostIPs:
- ip: 172.19.0.3
phase: Running
podIP: 10.244.1.113
podIPs:
- ip: 10.244.1.113
qosClass: BestEffort
startTime: "2024-07-10T16:20:42Z"
Loading

0 comments on commit 2b6c16c

Please sign in to comment.