diff --git a/build/charts/antrea/crds/packetcapture.yaml b/build/charts/antrea/crds/packetcapture.yaml index aee36ce85c1..ee5288e1ddd 100644 --- a/build/charts/antrea/crds/packetcapture.yaml +++ b/build/charts/antrea/crds/packetcapture.yaml @@ -39,6 +39,23 @@ spec: - jsonPath: .metadata.creationTimestamp name: Age type: date + - jsonPath: .status.numberCaptured + description: Number of packets captured so far. + name: Captured-Packets + type: integer + - jsonPath: .status.conditions[?(@.type=="PacketCaptureStarted")].status + description: Whether the capture has been started. + name: Started + type: string + - jsonPath: .status.conditions[?(@.type=="PacketCaptureComplete")].status + description: Whether the capture has completed. + name: Complete + type: string + - jsonPath: .status.conditions[?(@.type=="PacketCaptureFileUploaded")].status + description: Whether the capture file has been uploaded to the file server. + name: Uploaded + type: string + priority: 10 schema: openAPIV3Schema: type: object diff --git a/build/charts/flow-aggregator/templates/deployment.yaml b/build/charts/flow-aggregator/templates/deployment.yaml index 7c2bad356cc..5bbb7bcd824 100644 --- a/build/charts/flow-aggregator/templates/deployment.yaml +++ b/build/charts/flow-aggregator/templates/deployment.yaml @@ -12,6 +12,10 @@ spec: app: flow-aggregator template: metadata: + annotations: + # Automatically restart Pod if the ConfigMap changes + # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments + checksum/config: {{ include (print $.Template.BasePath "/configmap.yaml") . | sha256sum }} labels: app: flow-aggregator spec: diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index a96c3e3bf09..1c75d32613b 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -2954,6 +2954,23 @@ spec: - jsonPath: .metadata.creationTimestamp name: Age type: date + - jsonPath: .status.numberCaptured + description: Number of packets captured so far. + name: Captured-Packets + type: integer + - jsonPath: .status.conditions[?(@.type=="PacketCaptureStarted")].status + description: Whether the capture has been started. + name: Started + type: string + - jsonPath: .status.conditions[?(@.type=="PacketCaptureComplete")].status + description: Whether the capture has completed. + name: Complete + type: string + - jsonPath: .status.conditions[?(@.type=="PacketCaptureFileUploaded")].status + description: Whether the capture file has been uploaded to the file server. + name: Uploaded + type: string + priority: 10 schema: openAPIV3Schema: type: object diff --git a/build/yamls/antrea-crds.yml b/build/yamls/antrea-crds.yml index 96c8ae3dc13..4fa419048be 100644 --- a/build/yamls/antrea-crds.yml +++ b/build/yamls/antrea-crds.yml @@ -2927,6 +2927,23 @@ spec: - jsonPath: .metadata.creationTimestamp name: Age type: date + - jsonPath: .status.numberCaptured + description: Number of packets captured so far. + name: Captured-Packets + type: integer + - jsonPath: .status.conditions[?(@.type=="PacketCaptureStarted")].status + description: Whether the capture has been started. + name: Started + type: string + - jsonPath: .status.conditions[?(@.type=="PacketCaptureComplete")].status + description: Whether the capture has completed. + name: Complete + type: string + - jsonPath: .status.conditions[?(@.type=="PacketCaptureFileUploaded")].status + description: Whether the capture file has been uploaded to the file server. + name: Uploaded + type: string + priority: 10 schema: openAPIV3Schema: type: object diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index 47e72508caf..cddf6f90c81 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -2954,6 +2954,23 @@ spec: - jsonPath: .metadata.creationTimestamp name: Age type: date + - jsonPath: .status.numberCaptured + description: Number of packets captured so far. + name: Captured-Packets + type: integer + - jsonPath: .status.conditions[?(@.type=="PacketCaptureStarted")].status + description: Whether the capture has been started. + name: Started + type: string + - jsonPath: .status.conditions[?(@.type=="PacketCaptureComplete")].status + description: Whether the capture has completed. + name: Complete + type: string + - jsonPath: .status.conditions[?(@.type=="PacketCaptureFileUploaded")].status + description: Whether the capture file has been uploaded to the file server. + name: Uploaded + type: string + priority: 10 schema: openAPIV3Schema: type: object diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 5f3e1709f95..eb44cd80531 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -2954,6 +2954,23 @@ spec: - jsonPath: .metadata.creationTimestamp name: Age type: date + - jsonPath: .status.numberCaptured + description: Number of packets captured so far. + name: Captured-Packets + type: integer + - jsonPath: .status.conditions[?(@.type=="PacketCaptureStarted")].status + description: Whether the capture has been started. + name: Started + type: string + - jsonPath: .status.conditions[?(@.type=="PacketCaptureComplete")].status + description: Whether the capture has completed. + name: Complete + type: string + - jsonPath: .status.conditions[?(@.type=="PacketCaptureFileUploaded")].status + description: Whether the capture file has been uploaded to the file server. + name: Uploaded + type: string + priority: 10 schema: openAPIV3Schema: type: object diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index 0030e66f722..4e45d8d8fe0 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -2954,6 +2954,23 @@ spec: - jsonPath: .metadata.creationTimestamp name: Age type: date + - jsonPath: .status.numberCaptured + description: Number of packets captured so far. + name: Captured-Packets + type: integer + - jsonPath: .status.conditions[?(@.type=="PacketCaptureStarted")].status + description: Whether the capture has been started. + name: Started + type: string + - jsonPath: .status.conditions[?(@.type=="PacketCaptureComplete")].status + description: Whether the capture has completed. + name: Complete + type: string + - jsonPath: .status.conditions[?(@.type=="PacketCaptureFileUploaded")].status + description: Whether the capture file has been uploaded to the file server. + name: Uploaded + type: string + priority: 10 schema: openAPIV3Schema: type: object diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index 6b0a4c9cb1b..fe80a312053 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -2954,6 +2954,23 @@ spec: - jsonPath: .metadata.creationTimestamp name: Age type: date + - jsonPath: .status.numberCaptured + description: Number of packets captured so far. + name: Captured-Packets + type: integer + - jsonPath: .status.conditions[?(@.type=="PacketCaptureStarted")].status + description: Whether the capture has been started. + name: Started + type: string + - jsonPath: .status.conditions[?(@.type=="PacketCaptureComplete")].status + description: Whether the capture has completed. + name: Complete + type: string + - jsonPath: .status.conditions[?(@.type=="PacketCaptureFileUploaded")].status + description: Whether the capture file has been uploaded to the file server. + name: Uploaded + type: string + priority: 10 schema: openAPIV3Schema: type: object diff --git a/build/yamls/flow-aggregator.yml b/build/yamls/flow-aggregator.yml index f2f35db1b52..caec1584de2 100644 --- a/build/yamls/flow-aggregator.yml +++ b/build/yamls/flow-aggregator.yml @@ -400,6 +400,8 @@ spec: app: flow-aggregator template: metadata: + annotations: + checksum/config: 5ba1a6d1b9d3b40e2ea26e37aa2bea38fda2558c20564873936472136651de37 labels: app: flow-aggregator spec: diff --git a/go.mod b/go.mod index 7499cba0895..c365e9de14a 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,7 @@ require ( github.com/gogo/protobuf v1.3.2 github.com/google/btree v1.1.3 github.com/google/uuid v1.6.0 - github.com/gopacket/gopacket v1.3.1 + github.com/gopacket/gopacket v1.3.2-0.20241202175635-b43272ae1eb8 github.com/hashicorp/memberlist v0.5.3 github.com/k8snetworkplumbingwg/network-attachment-definition-client v1.3.0 github.com/k8snetworkplumbingwg/sriov-cni v2.1.0+incompatible @@ -42,7 +42,7 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 github.com/onsi/ginkgo/v2 v2.22.2 github.com/onsi/gomega v1.36.2 - github.com/osrg/gobgp/v3 v3.33.0 + github.com/osrg/gobgp/v3 v3.34.0 github.com/pkg/sftp v1.13.7 github.com/prometheus/client_golang v1.20.5 github.com/prometheus/common v0.62.0 @@ -56,11 +56,11 @@ require ( github.com/vmware/go-ipfix v0.12.0 go.uber.org/mock v0.5.0 golang.org/x/crypto v0.32.0 - golang.org/x/mod v0.22.0 + golang.org/x/mod v0.23.0 golang.org/x/net v0.34.0 - golang.org/x/sync v0.10.0 - golang.org/x/sys v0.29.0 - golang.org/x/time v0.9.0 + golang.org/x/sync v0.11.0 + golang.org/x/sys v0.30.0 + golang.org/x/time v0.10.0 golang.org/x/tools v0.29.0 golang.zx2c4.com/wireguard/wgctrl v0.0.0-20210506160403-92e472f520a5 google.golang.org/grpc v1.70.0 diff --git a/go.sum b/go.sum index 209603993ba..7e1c1cfbe20 100644 --- a/go.sum +++ b/go.sum @@ -383,8 +383,8 @@ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+ github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY= github.com/googleapis/gnostic v0.1.0/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY= github.com/googleapis/gnostic v0.3.1/go.mod h1:on+2t9HRStVgn95RSsFWFz+6Q0Snyqv1awfrALZdbtU= -github.com/gopacket/gopacket v1.3.1 h1:ZppWyLrOJNZPe5XkdjLbtuTkfQoxQ0xyMJzQCqtqaPU= -github.com/gopacket/gopacket v1.3.1/go.mod h1:3I13qcqSpB2R9fFQg866OOgzylYkZxLTmkvcXhvf6qg= +github.com/gopacket/gopacket v1.3.2-0.20241202175635-b43272ae1eb8 h1:PoilRl1aPz9JlypuskS97qoGuXbEGBGza7YmXQyAwP8= +github.com/gopacket/gopacket v1.3.2-0.20241202175635-b43272ae1eb8/go.mod h1:3I13qcqSpB2R9fFQg866OOgzylYkZxLTmkvcXhvf6qg= github.com/gophercloud/gophercloud v0.1.0/go.mod h1:vxM41WHh5uqHVBMZHzuwNOHh8XEoIEcSTewFxm1c5g8= github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= @@ -608,8 +608,8 @@ github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8= github.com/onsi/gomega v1.36.2/go.mod h1:DdwyADRjrc825LhMEkD76cHR5+pUnjhUN8GlHlRPHzY= github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c= github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM= -github.com/osrg/gobgp/v3 v3.33.0 h1:G8NlY1gzz0DOfiwfiYv2++vWpPLm+CMAKYRVzSmaJow= -github.com/osrg/gobgp/v3 v3.33.0/go.mod h1:8m+kgkdaWrByxg5EWpNUO2r/mopodrNBOUBhMnW/yGQ= +github.com/osrg/gobgp/v3 v3.34.0 h1:DDIWsAIE7j1dwhSV3tGsTKs9OO8MTOS4atErebZxTtA= +github.com/osrg/gobgp/v3 v3.34.0/go.mod h1:l2nPaHaLmIoKbFxMUzKon/h6c9BTzCp5zJI9Dhnrx5c= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/paulmach/orb v0.8.0 h1:W5XAt5yNPNnhaMNEf0xNSkBMJ1LzOzdk2MRlB6EN0Vs= @@ -878,8 +878,8 @@ golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/mod v0.22.0 h1:D4nJWe9zXqHOmWqj4VMOJhvzj7bEZg4wEYa759z1pH4= -golang.org/x/mod v0.22.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= +golang.org/x/mod v0.23.0 h1:Zb7khfcRGKk+kqfxFaP5tZqCnDZMjC5VtUBs87Hr6QM= +golang.org/x/mod v0.23.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= golang.org/x/net v0.0.0-20170114055629-f2499483f923/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -936,8 +936,8 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= -golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= +golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -999,8 +999,8 @@ golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= -golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= +golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -1028,8 +1028,8 @@ golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY= -golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/time v0.10.0 h1:3usCWA8tQn0L8+hFJQNgzpWbd89begxN66o1Ojdn5L4= +golang.org/x/time v0.10.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20181011042414-1f849cf54d09/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/pkg/agent/cniserver/pod_configuration.go b/pkg/agent/cniserver/pod_configuration.go index 776b80ac829..bbba8738472 100644 --- a/pkg/agent/cniserver/pod_configuration.go +++ b/pkg/agent/cniserver/pod_configuration.go @@ -445,10 +445,6 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain var podWg sync.WaitGroup for _, pod := range pods { - // Skip Pods for which we are not in charge of the networking. - if pod.Spec.HostNetwork { - continue - } desiredPods.Insert(k8s.NamespacedName(pod.Namespace, pod.Name)) for _, podIP := range pod.Status.PodIPs { desiredPodIPs.Insert(podIP.IP) diff --git a/pkg/agent/cniserver/server.go b/pkg/agent/cniserver/server.go index 253c9ec2065..53b216d2a97 100644 --- a/pkg/agent/cniserver/server.go +++ b/pkg/agent/cniserver/server.go @@ -29,7 +29,6 @@ import ( "github.com/containernetworking/cni/pkg/version" "github.com/containernetworking/plugins/pkg/ip" "google.golang.org/grpc" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" @@ -756,14 +755,15 @@ func (s *CNIServer) interceptCheck(cniConfig *CNIConfig) (*cnipb.CniCmdResponse, // reconcile performs startup reconciliation for the CNI server. The CNI server is in charge of // installing Pod flows, so as part of this reconciliation process we retrieve the Pod list from the // K8s apiserver and replay the necessary flows. +// The Pods are processed in reconcile as below, +// | Pod Type | Spec.HostNetwork | windowsOptions.hostProcess | OVS interface needed? | List Pods in reconcile | +// | Normal Pod (non-HostNetwork) | false | false or N/A | Yes | Yes | +// | Linux HostNetwork Pod | true | N/A | No | No | +// | Windows HostNetwork Pod | true | false | Yes | Yes | +// | Windows HostProcess Pod | true | true | No | Yes | func (s *CNIServer) reconcile() error { klog.InfoS("Starting reconciliation for CNI server") - // For performance reasons, use ResourceVersion="0" in the ListOptions to ensure the request is served from - // the watch cache in kube-apiserver. - pods, err := s.kubeClient.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{ - FieldSelector: "spec.nodeName=" + s.nodeConfig.Name, - ResourceVersion: "0", - }) + pods, err := s.kubeClient.CoreV1().Pods("").List(context.TODO(), s.getPodsListOptions()) if err != nil { return fmt.Errorf("failed to list Pods running on Node %s: %v", s.nodeConfig.Name, err) } diff --git a/pkg/agent/cniserver/server_linux.go b/pkg/agent/cniserver/server_linux.go index 0e21a557940..9f3f8db76b1 100644 --- a/pkg/agent/cniserver/server_linux.go +++ b/pkg/agent/cniserver/server_linux.go @@ -14,7 +14,12 @@ package cniserver -import current "github.com/containernetworking/cni/pkg/types/100" +import ( + "fmt" + + current "github.com/containernetworking/cni/pkg/types/100" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) // updateResultDNSConfig updates the DNS config from CNIConfig. func updateResultDNSConfig(result *current.Result, cniConfig *CNIConfig) { @@ -48,3 +53,13 @@ func validateRuntime(netNS string) error { func (c *CNIConfig) getInfraContainer() string { return c.ContainerId } + +// getPodsListOptions returns the none host-network Pods running on the current Node. +func (s *CNIServer) getPodsListOptions() metav1.ListOptions { + return metav1.ListOptions{ + FieldSelector: fmt.Sprintf("spec.nodeName=%s,spec.hostNetwork=false", s.nodeConfig.Name), + // For performance reasons, use ResourceVersion="0" in the ListOptions to ensure the request is served from + // the watch cache in kube-apiserver. + ResourceVersion: "0", + } +} diff --git a/pkg/agent/cniserver/server_windows.go b/pkg/agent/cniserver/server_windows.go index 794c10e1d66..b45b5587ca9 100644 --- a/pkg/agent/cniserver/server_windows.go +++ b/pkg/agent/cniserver/server_windows.go @@ -22,6 +22,7 @@ import ( "strings" current "github.com/containernetworking/cni/pkg/types/100" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" ) @@ -98,3 +99,15 @@ func getInfraContainer(containerID, netNS string) string { func (c *CNIConfig) getInfraContainer() string { return getInfraContainer(c.ContainerId, c.Netns) } + +// getPodsListOptions returns the Pods running on the current Node. Note, the host-network Pods are not filtered +// out on Windows because they are also managed by antrea as long as "spec.SecurityContext.windowsOptions.hostProcess" +// is not configured. +func (s *CNIServer) getPodsListOptions() metav1.ListOptions { + return metav1.ListOptions{ + FieldSelector: fmt.Sprintf("spec.nodeName=%s", s.nodeConfig.Name), + // For performance reasons, use ResourceVersion="0" in the ListOptions to ensure the request is served from + // the watch cache in kube-apiserver. + ResourceVersion: "0", + } +} diff --git a/pkg/agent/cniserver/server_windows_test.go b/pkg/agent/cniserver/server_windows_test.go index 636d399ae71..42eb7bc5048 100644 --- a/pkg/agent/cniserver/server_windows_test.go +++ b/pkg/agent/cniserver/server_windows_test.go @@ -46,6 +46,7 @@ import ( "antrea.io/antrea/pkg/ovs/ovsconfig" ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing" "antrea.io/antrea/pkg/util/channel" + utilip "antrea.io/antrea/pkg/util/ip" ) var ( @@ -53,6 +54,22 @@ var ( dnsSearches = []string{"a.b.c.d"} mockWinnet *winnettest.MockInterface + + interfaceForHostNetworkPod = &interfacestore.InterfaceConfig{ + InterfaceName: "iface2", + Type: interfacestore.ContainerInterface, + IPs: []net.IP{net.ParseIP("1.1.1.2")}, + MAC: utilip.MustParseMAC("00:11:22:33:44:02"), + OVSPortConfig: &interfacestore.OVSPortConfig{ + PortUUID: generateUUID(), + OFPort: int32(4), + }, + ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ + PodName: pod2.Name, + PodNamespace: testPodNamespace, + ContainerID: generateUUID(), + }, + } ) func TestUpdateResultDNSConfig(t *testing.T) { @@ -732,7 +749,7 @@ func TestReconcile(t *testing.T) { cniServer := newCNIServer(t) cniServer.routeClient = mockRoute cniServer.kubeClient = kubeClient - for _, containerIface := range []*interfacestore.InterfaceConfig{normalInterface, staleInterface, unconnectedInterface} { + for _, containerIface := range []*interfacestore.InterfaceConfig{normalInterface, staleInterface, unconnectedInterface, interfaceForHostNetworkPod} { ifaceStore.AddInterface(containerIface) } waiter := newAsyncWaiter(unconnectedInterface.PodName, unconnectedInterface.ContainerID, stopCh) @@ -741,11 +758,19 @@ func TestReconcile(t *testing.T) { go cniServer.podConfigurator.Run(stopCh) // Re-install Pod1 flows - podFlowsInstalled := make(chan string, 2) + expReinstalledPodCount := 3 + podFlowsInstalled := make(chan string, expReinstalledPodCount) mockOFClient.EXPECT().InstallPodFlows(normalInterface.InterfaceName, normalInterface.IPs, normalInterface.MAC, uint32(normalInterface.OFPort), uint16(0), nil). Do(func(interfaceName string, _ []net.IP, _ net.HardwareAddr, _ uint32, _ uint16, _ *uint32) { podFlowsInstalled <- interfaceName }).Times(1) + + // Re-install host-network Pod (Pod2) flows + mockOFClient.EXPECT().InstallPodFlows(interfaceForHostNetworkPod.InterfaceName, interfaceForHostNetworkPod.IPs, interfaceForHostNetworkPod.MAC, uint32(interfaceForHostNetworkPod.OFPort), uint16(0), nil). + Do(func(interfaceName string, _ []net.IP, _ net.HardwareAddr, _ uint32, _ uint16, _ *uint32) { + podFlowsInstalled <- interfaceName + }).Times(1) + // Uninstall Pod3 flows which is deleted. mockOFClient.EXPECT().UninstallPodFlows(staleInterface.InterfaceName).Return(nil).Times(1) mockOVSBridgeClient.EXPECT().DeletePort(staleInterface.PortUUID).Return(nil).Times(1) @@ -778,7 +803,7 @@ func TestReconcile(t *testing.T) { assert.NoError(t, err) _, exists := ifaceStore.GetInterfaceByName("iface3") assert.False(t, exists) - for i := 0; i < 2; i++ { + for i := 0; i < expReinstalledPodCount; i++ { select { case <-podFlowsInstalled: case <-time.After(500 * time.Millisecond): diff --git a/pkg/antctl/raw/supportbundle/command.go b/pkg/antctl/raw/supportbundle/command.go index d5787929dd2..6feb9dfd2e3 100644 --- a/pkg/antctl/raw/supportbundle/command.go +++ b/pkg/antctl/raw/supportbundle/command.go @@ -33,10 +33,14 @@ import ( "golang.org/x/sync/errgroup" "golang.org/x/time/rate" "gopkg.in/yaml.v2" + + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8sruntime "k8s.io/apimachinery/pkg/runtime" + utilerror "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/klog/v2" @@ -47,6 +51,8 @@ import ( systemv1beta1 "antrea.io/antrea/pkg/apis/system/v1beta1" antrea "antrea.io/antrea/pkg/client/clientset/versioned" systemclientset "antrea.io/antrea/pkg/client/clientset/versioned/typed/system/v1beta1" + "antrea.io/antrea/pkg/util/compress" + "antrea.io/antrea/pkg/util/k8s" ) const ( @@ -581,6 +587,20 @@ func controllerRemoteRunE(cmd *cobra.Command, args []string) error { return fmt.Errorf("failed to create clientset: %w", err) } + if err := os.MkdirAll(option.dir, 0700); err != nil { + return fmt.Errorf("error when creating output dir: %w", err) + } + + f, err := os.Create(filepath.Join(option.dir, "clusterinfo")) + if err != nil { + return err + } + defer f.Close() + err = getClusterInfo(f, k8sClientset) + if err != nil { + return err + } + var controllerClient systemclientset.SupportBundleInterface var agentClients map[string]systemclientset.SupportBundleInterface @@ -625,9 +645,6 @@ func controllerRemoteRunE(cmd *cobra.Command, args []string) error { return fmt.Errorf("no matched Nodes found to collect agent bundles") } - if err := os.MkdirAll(option.dir, 0700|os.ModeDir); err != nil { - return fmt.Errorf("error when creating output dir: %w", err) - } amount := len(agentClients) * 2 if controllerClient != nil { amount += 2 @@ -635,19 +652,10 @@ func controllerRemoteRunE(cmd *cobra.Command, args []string) error { bar := barTmpl.Start(amount) defer bar.Finish() defer bar.Set("prefix", "Finish ") - f, err := os.Create(filepath.Join(option.dir, "clusterinfo")) - if err != nil { - return err - } - defer f.Close() - err = getClusterInfo(f, k8sClientset) - if err != nil { - return err - } results := requestAll(ctx, agentClients, controllerClient, bar) results = downloadAll(ctx, agentClients, controllerClient, dir, bar, results) - return processResults(results, dir) + return processResults(ctx, antreaClientset, k8sClientset, results, dir) } func genErrorMsg(resultMap map[string]error) string { @@ -659,8 +667,9 @@ func genErrorMsg(resultMap map[string]error) string { } // processResults will output the failed nodes and their reasons if any. If no data was collected, -// error is returned, otherwise will return nil. -func processResults(resultMap map[string]error, dir string) error { +// error is returned, otherwise will return nil. For failed nodes and controller, will also trying to get logs from +// kubernetes api. +func processResults(ctx context.Context, antreaClientset antrea.Interface, k8sClient kubernetes.Interface, resultMap map[string]error, dir string) error { resultStr := "" var failedNodes []string allFailed := true @@ -676,7 +685,8 @@ func processResults(resultMap map[string]error, dir string) error { } } - if resultMap[""] != nil { + controllerFailed := resultMap[""] != nil + if controllerFailed { fmt.Println("Controller Info Failed Reason: " + resultMap[""].Error()) } @@ -689,9 +699,163 @@ func processResults(resultMap map[string]error, dir string) error { err = writeFailedNodes(dir, failedNodes) } + // download logs from kubernetes api + if failedNodes != nil { + if err = downloadFallbackAgentBundleFromKubernetes(ctx, antreaClientset, k8sClient, failedNodes, dir); err != nil { + fmt.Println("Failed to download agent bundle from kubernetes api: " + err.Error()) + } else { + allFailed = false + } + } + if controllerFailed { + if err = downloadFallbackControllerBundleFromKubernetes(ctx, antreaClientset, k8sClient, dir); err != nil { + fmt.Println("Failed to download controller bundle from kubernetes api: " + err.Error()) + } else { + allFailed = false + } + } + if allFailed { return fmt.Errorf("no data was collected: %s", genErrorMsg(resultMap)) } else { return err } } + +func downloadFallbackControllerBundleFromKubernetes(ctx context.Context, antreaClientset antrea.Interface, k8sClient kubernetes.Interface, dir string) error { + tmpDir, err := afero.TempDir(defaultFS, "", "bundle_tmp_") + if err != nil { + return err + } + defer defaultFS.RemoveAll(tmpDir) + + var podRef *corev1.ObjectReference + if err := func() error { + controllerInfo, err := antreaClientset.CrdV1beta1().AntreaControllerInfos().Get(ctx, v1beta1.AntreaControllerInfoResourceName, metav1.GetOptions{}) + if err != nil { + return err + } + podRef = &controllerInfo.PodRef + data, err := yaml.Marshal(controllerInfo) + if err != nil { + return err + } + if err := afero.WriteFile(defaultFS, filepath.Join(dir, "controllerinfo"), data, 0644); err != nil { + return err + } + return nil + }(); err != nil { + return err + } + if podRef == nil { + return fmt.Errorf("no podRef found in AntreaControllerInfo") + } + pod, err := k8sClient.CoreV1().Pods(podRef.Namespace).Get(ctx, podRef.Name, metav1.GetOptions{}) + if err != nil { + return err + } + if err := downloadPodLogs(ctx, k8sClient, pod.Namespace, pod.Name, k8s.GetPodContainerNames(pod), tmpDir); err != nil { + return err + } + return packPodBundle(pod, dir, tmpDir) +} + +func downloadFallbackAgentBundleFromKubernetes(ctx context.Context, antreaClientset antrea.Interface, k8sClient kubernetes.Interface, failedNodes []string, dir string) error { + agentInfoList, err := antreaClientset.CrdV1beta1().AntreaAgentInfos().List(ctx, metav1.ListOptions{ResourceVersion: "0"}) + if err != nil { + return err + } + + agentInfoMap := map[string]v1beta1.AntreaAgentInfo{} + for _, agentInfo := range agentInfoList.Items { + agentInfoMap[agentInfo.Name] = agentInfo + } + pods, err := k8sClient.CoreV1().Pods("kube-system").List(ctx, metav1.ListOptions{ + ResourceVersion: "0", + LabelSelector: "app=antrea,component=antrea-agent", + }) + if err != nil { + return err + } + failedNodeSet := sets.NewString(failedNodes...) + var errors []error + for _, pod := range pods.Items { + if !failedNodeSet.Has(pod.Spec.NodeName) { + continue + } + if err := func() error { + tmpDir, err := afero.TempDir(defaultFS, "", "bundle_tmp_") + if err != nil { + return err + } + defer defaultFS.RemoveAll(tmpDir) + if agentInfo, ok := agentInfoMap[pod.Spec.NodeName]; ok { + data, err := yaml.Marshal(agentInfo) + if err != nil { + return err + } + if err = afero.WriteFile(defaultFS, filepath.Join(tmpDir, "agentinfo"), data, 0644); err != nil { + return err + } + } + err = downloadPodLogs(ctx, k8sClient, pod.Namespace, pod.Name, k8s.GetPodContainerNames(&pod), tmpDir) + if err != nil { + return err + } + return packPodBundle(&pod, dir, tmpDir) + }(); err != nil { + errors = append(errors, err) + } + } + return utilerror.NewAggregate(errors) +} + +func packPodBundle(pod *corev1.Pod, dir string, bundleDir string) error { + prefix := "agent_" + if strings.Contains(pod.Name, "controller") { + prefix = "controller_" + } + gzFileName := filepath.Join(dir, prefix+pod.Spec.NodeName+".tar.gz") + f, err := defaultFS.Create(gzFileName) + if err != nil { + return err + } + defer f.Close() + _, err = compress.PackDir(defaultFS, bundleDir, f) + return err +} + +func downloadPodLogs(ctx context.Context, k8sClient kubernetes.Interface, namespace string, podName string, containers []string, dir string) error { + downloadContainerLogs := func(containerName string) error { + containerDirName, _ := strings.CutPrefix(containerName, "antrea-") + containerLogDir := filepath.Join(dir, "logs", containerDirName) + err := os.MkdirAll(containerLogDir, 0755) + if err != nil { + return err + } + fileName := filepath.Join(containerLogDir, containerName+".log") + f, err := defaultFS.Create(fileName) + if err != nil { + return err + } + defer f.Close() + logOption := &corev1.PodLogOptions{ + Container: containerName, + } + logs := k8sClient.CoreV1().Pods(namespace).GetLogs(podName, logOption) + logStream, err := logs.Stream(ctx) + if err != nil { + return err + } + + if _, err = io.Copy(f, logStream); err != nil { + return err + } + return logStream.Close() + } + var errors []error + for _, containerName := range containers { + errors = append(errors, downloadContainerLogs(containerName)) + } + return utilerror.NewAggregate(errors) +} diff --git a/pkg/antctl/raw/supportbundle/command_test.go b/pkg/antctl/raw/supportbundle/command_test.go index cbca643e7f2..5d7ae2ec61d 100644 --- a/pkg/antctl/raw/supportbundle/command_test.go +++ b/pkg/antctl/raw/supportbundle/command_test.go @@ -1,4 +1,4 @@ -// Copyright 2023 Antrea Authors +// copyright 2023 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import ( "errors" "fmt" "path/filepath" + "strings" "testing" "time" @@ -41,6 +42,7 @@ import ( fakeclientset "antrea.io/antrea/pkg/client/clientset/versioned/fake" "antrea.io/antrea/pkg/client/clientset/versioned/scheme" systemclientset "antrea.io/antrea/pkg/client/clientset/versioned/typed/system/v1beta1" + "antrea.io/antrea/pkg/util/compress" ) var ( @@ -59,6 +61,10 @@ var ( Kind: "Node", Name: "node-1", }, + PodRef: v1.ObjectReference{ + Name: "antrea-controller-1", + Namespace: "kube-system", + }, } node1 = v1.Node{ ObjectMeta: metav1.ObjectMeta{ @@ -76,7 +82,11 @@ var ( } node2 = v1.Node{ ObjectMeta: metav1.ObjectMeta{ - Name: "node-1", + Name: "node-1", + ResourceVersion: "0", + }, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{}, }, } node3 = v1.Node{ @@ -95,7 +105,7 @@ var ( } agentInfo1 = &v1beta1.AntreaAgentInfo{ ObjectMeta: metav1.ObjectMeta{ - Name: "antrea-agent-1", + Name: "node-1", }, APIPort: 0, PodRef: v1.ObjectReference{ @@ -108,7 +118,7 @@ var ( } agentInfo2 = &v1beta1.AntreaAgentInfo{ ObjectMeta: metav1.ObjectMeta{ - Name: "antrea-agent-2", + Name: "node-2", }, APIPort: 0, PodRef: v1.ObjectReference{ @@ -119,6 +129,63 @@ var ( Name: "node-3", }, } + controllerPod = &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "antrea-controller-1", + Namespace: "kube-system", + Labels: map[string]string{ + "app": "antrea", + "component": "antrea-controller", + }, + }, + Spec: v1.PodSpec{ + NodeName: "node-1", + Containers: []v1.Container{ + { + Name: "antrea-controller", + }, + }, + }, + } + pod1 = &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "antrea-agent-1", + Namespace: "kube-system", + Labels: map[string]string{ + "app": "antrea", + "component": "antrea-agent", + }, + }, + Spec: v1.PodSpec{ + NodeName: "node-1", + Containers: []v1.Container{ + { + Name: "antrea-agent", + }, + { + Name: "antrea-ovs", + }, + }, + }, + } + pod2 = &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "antrea-agent-2", + Namespace: "kube-system", + Labels: map[string]string{ + "app": "antrea", + "component": "antrea-agent", + }, + }, + Spec: v1.PodSpec{ + NodeName: "node-2", + Containers: []v1.Container{ + { + Name: "antrea-agent", + }, + }, + }, + } nameList = []string{"node-1", "node-3"} ) @@ -320,9 +387,9 @@ func TestProcessResults(t *testing.T) { option.dir = path }() tests := []struct { - name string - resultMap map[string]error - expectedErr string + name string + resultMap map[string]error + expectFileList map[string][]string }{ { name: "All nodes failed", @@ -331,7 +398,20 @@ func TestProcessResults(t *testing.T) { "node-1": fmt.Errorf("error-1"), "node-2": fmt.Errorf("error-2"), }, - expectedErr: "no data was collected:", + expectFileList: map[string][]string{ + "": { + filepath.Join("logs", "controller", "antrea-controller.log"), + }, + "node-1": { + "agentinfo", + filepath.Join("logs", "ovs", "antrea-ovs.log"), + filepath.Join("logs", "agent", "antrea-agent.log"), + }, + "node-2": { + "agentinfo", + filepath.Join("logs", "agent", "antrea-agent.log"), + }, + }, }, { name: "Not all nodes failed", @@ -340,28 +420,59 @@ func TestProcessResults(t *testing.T) { "node-1": fmt.Errorf("error-1"), "node-2": nil, }, + expectFileList: map[string][]string{ + "": { + filepath.Join("logs", "controller", "antrea-controller.log"), + }, + "node-1": { + "agentinfo", + filepath.Join("logs", "ovs", "antrea-ovs.log"), + filepath.Join("logs", "agent", "antrea-agent.log"), + }, + }, }, } + defaultFS = afero.NewMemMapFs() + defer func() { + defaultFS = afero.NewOsFs() + option.dir = "" + }() for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - defaultFS = afero.NewMemMapFs() + option.dir = strings.ReplaceAll(tt.name, " ", "-") defaultFS.MkdirAll(option.dir, 0755) - defer func() { - defaultFS = afero.NewOsFs() - }() - - err := processResults(tt.resultMap, option.dir) - if tt.expectedErr != "" { - require.ErrorContains(t, err, tt.expectedErr) - } else { - require.NoError(t, err) - } - // Both test cases above have failed Nodes, hence this file should always be created/ + antreaInterface := fakeclientset.NewSimpleClientset(&controllerInfo, agentInfo1, agentInfo2) + k8sClient := fake.NewSimpleClientset(controllerPod, pod1, pod2) + require.NoError(t, processResults(context.TODO(), antreaInterface, k8sClient, tt.resultMap, option.dir)) b, err := afero.ReadFile(defaultFS, filepath.Join(option.dir, "failed_nodes")) require.NoError(t, err) data := string(b) + ok, checkErr := afero.Exists(defaultFS, filepath.Join(option.dir, "controllerinfo")) + require.NoError(t, checkErr) + assert.True(t, ok) + for node, err := range tt.resultMap { + tgzFileName := fmt.Sprintf("agent_%s.tar.gz", node) + if node == "" { + tgzFileName = "controller_node-1.tar.gz" + } + if err != nil { + // fallback path to retrieve data from kubernetes API instead of Antrea API. + ok, checkErr := afero.Exists(defaultFS, filepath.Join(option.dir, tgzFileName)) + require.NoError(t, checkErr) + require.True(t, ok, "expected bundle file %s not found", tgzFileName) + + unpackError := compress.UnpackDir(defaultFS, filepath.Join(option.dir, tgzFileName), option.dir) + require.NoError(t, unpackError) + files, _ := tt.expectFileList[node] + for _, expectFileName := range files { + ok, checkErr = afero.Exists(defaultFS, filepath.Join(option.dir, expectFileName)) + require.NoError(t, checkErr) + assert.True(t, ok, "expected bundle file %s for %s not found", expectFileName, node) + } + + } if node == "" { continue } diff --git a/pkg/controller/grouping/group_entity_index_test.go b/pkg/controller/grouping/group_entity_index_test.go index 89342df074e..bd8eb28ef31 100644 --- a/pkg/controller/grouping/group_entity_index_test.go +++ b/pkg/controller/grouping/group_entity_index_test.go @@ -550,13 +550,17 @@ func TestGroupEntityIndexEventHandlers(t *testing.T) { } tt.inputEvent(index) - time.Sleep(100 * time.Millisecond) - lock.Lock() - defer lock.Unlock() - assert.Equal(t, len(tt.expectedGroupsCalled), len(actualGroupsCalled)) - for groupType, expected := range tt.expectedGroupsCalled { - assert.ElementsMatch(t, expected, actualGroupsCalled[groupType]) - } + assert.EventuallyWithT(t, func(t *assert.CollectT) { + lock.Lock() + defer lock.Unlock() + if !assert.Equal(t, len(tt.expectedGroupsCalled), len(actualGroupsCalled)) { + // If the lengths don't match, don't bother checking the contents, return early. + return + } + for groupType, expected := range tt.expectedGroupsCalled { + assert.ElementsMatch(t, expected, actualGroupsCalled[groupType]) + } + }, 1*time.Second, 50*time.Millisecond) }) } } diff --git a/pkg/flowaggregator/flowaggregator.go b/pkg/flowaggregator/flowaggregator.go index 0fcc6259391..b5eab8f24d0 100644 --- a/pkg/flowaggregator/flowaggregator.go +++ b/pkg/flowaggregator/flowaggregator.go @@ -217,6 +217,7 @@ func NewFlowAggregator( if opt.Config.FlowCollector.Enable { fa.ipfixExporter = newIPFIXExporter(clusterUUID, opt, registry) } + klog.InfoS("FlowAggregator initialized", "mode", opt.AggregatorMode) return fa, nil } @@ -901,6 +902,13 @@ func (fa *flowAggregator) handleWatcherEvent() error { } func (fa *flowAggregator) updateFlowAggregator(opt *options.Options) { + // If user tries to change the mode dynamically, it makes sense to error out immediately and + // ignore other updates, as this is such a major configuration parameter. + // Unsupported "minor" updates are handled at the end of this function. + if opt.AggregatorMode != fa.aggregatorMode { + klog.ErrorS(nil, "FlowAggregator mode cannot be changed without restarting") + return + } if opt.Config.FlowCollector.Enable { if fa.ipfixExporter == nil { klog.InfoS("Enabling Flow-Collector") diff --git a/pkg/flowaggregator/flowaggregator_test.go b/pkg/flowaggregator/flowaggregator_test.go index 40b6a859ac7..8b405a555f6 100644 --- a/pkg/flowaggregator/flowaggregator_test.go +++ b/pkg/flowaggregator/flowaggregator_test.go @@ -753,6 +753,7 @@ func TestFlowAggregator_Run(t *testing.T) { makeOptions := func(config *flowaggregatorconfig.FlowAggregatorConfig) *options.Options { return &options.Options{ + AggregatorMode: flowAggregator.aggregatorMode, ActiveFlowRecordTimeout: flowAggregator.activeFlowRecordTimeout, Config: config, } diff --git a/pkg/util/compress/compress.go b/pkg/util/compress/compress.go index 3567c5d6faf..0991bd90be7 100644 --- a/pkg/util/compress/compress.go +++ b/pkg/util/compress/compress.go @@ -18,6 +18,8 @@ import ( "archive/tar" "compress/gzip" "crypto/sha256" + "errors" + "fmt" "io" "os" "path/filepath" @@ -26,6 +28,68 @@ import ( "github.com/spf13/afero" ) +// Sanitize archive file pathing from "G305: Zip Slip vulnerability" +func sanitizeArchivePath(d, t string) (string, error) { + v := filepath.Join(d, t) + if strings.HasPrefix(v, filepath.Clean(d)) { + return v, nil + } + return "", fmt.Errorf("%s: %s", "content filepath is tainted", t) +} + +func UnpackDir(fs afero.Fs, fileName string, targetDir string) error { + file, err := fs.Open(fileName) + if err != nil { + return err + } + defer file.Close() + + reader, err := gzip.NewReader(file) + if err != nil { + return err + } + defer reader.Close() + tarReader := tar.NewReader(reader) + + for true { + header, err := tarReader.Next() + if err == io.EOF { + break + } + if err != nil { + return err + } + targetPath, err := sanitizeArchivePath(targetDir, header.Name) + if err != nil { + return err + } + switch header.Typeflag { + case tar.TypeDir: + if err := fs.Mkdir(targetPath, 0755); err != nil { + return err + } + case tar.TypeReg: + outFile, err := fs.Create(targetPath) + defer outFile.Close() + if err != nil { + return err + } + for { + // to resolve G110: Potential DoS vulnerability via decompression bomb + if _, err := io.CopyN(outFile, tarReader, 1024); err != nil { + if err == io.EOF { + break + } + return err + } + } + default: + return errors.New("unknown type found when reading tgz file") + } + } + return nil +} + func PackDir(fs afero.Fs, dir string, writer io.Writer) ([]byte, error) { hash := sha256.New() gzWriter := gzip.NewWriter(io.MultiWriter(hash, writer)) diff --git a/pkg/util/k8s/pod.go b/pkg/util/k8s/pod.go index f14c2a73a56..e171d0940d5 100644 --- a/pkg/util/k8s/pod.go +++ b/pkg/util/k8s/pod.go @@ -20,3 +20,15 @@ import v1 "k8s.io/api/core/v1" func IsPodTerminated(pod *v1.Pod) bool { return pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded } + +// GetPodContainersNames returns all the container names in a Pod, including init containers. +func GetPodContainerNames(pod *v1.Pod) []string { + var names []string + for _, c := range pod.Spec.InitContainers { + names = append(names, c.Name) + } + for _, c := range pod.Spec.Containers { + names = append(names, c.Name) + } + return names +} diff --git a/test/e2e/connectivity_test.go b/test/e2e/connectivity_test.go index 6c6c33c0ff3..6ede5dc966b 100644 --- a/test/e2e/connectivity_test.go +++ b/test/e2e/connectivity_test.go @@ -70,6 +70,10 @@ func TestConnectivity(t *testing.T) { skipIfNumNodesLessThan(t, 2) testPingLargeMTU(t, data) }) + t.Run("testWindowsPodConnectivityAfterAntreaRestart", func(t *testing.T) { + skipIfNoWindowsNodes(t) + testWindowsPodConnectivityAfterAntreaRestart(t, data) + }) } func waitForPodIPs(t *testing.T, data *TestData, podInfos []PodInfo) map[string]*PodIPs { @@ -121,6 +125,100 @@ func (data *TestData) runPingMesh(t *testing.T, podInfos []PodInfo, ctrname stri } } +// verifyWindowsPodConnectivity checks Pod connectivity after antrea-agent is restarted on Windows. +// We test both the generic Pod case and the host-network Pod case, because CNI on Windows is also +// responsible for the host-network Pod's networking as long as it is not using host-process containers. +func testWindowsPodConnectivityAfterAntreaRestart(t *testing.T, data *TestData) { + linuxWorkerNode := clusterInfo.controlPlaneNodeName + linuxPodName := randName("test-pod-") + clientPod := PodInfo{ + Name: linuxPodName, + Namespace: data.testNamespace, + NodeName: linuxWorkerNode, + OS: "linux", + } + + t.Logf("Creating Linux Pod %s on Node '%s'", linuxPodName, linuxWorkerNode) + if err := data.createToolboxPodOnNode(clientPod.Name, clientPod.Namespace, clientPod.NodeName, false); err != nil { + t.Fatalf("Error when creating Pod '%s': %v", clientPod.Name, err) + } + defer deletePodWrapper(t, data, clientPod.Namespace, clientPod.Name) + + t.Run("testGenericPodConnectivity", func(t *testing.T) { + data.verifyWindowsPodConnectivity(t, clientPod, false) + }) + t.Run("testHostNetworkPodConnectivity", func(t *testing.T) { + data.verifyWindowsPodConnectivity(t, clientPod, true) + }) +} + +func (data *TestData) dumpOVSFlows(t *testing.T, workerNode string) []string { + ovsOfctlCmd := "ovs-ofctl" + if clusterInfo.nodesOS[workerNode] == "windows" { + ovsOfctlCmd = `c:/openvswitch/usr/bin/ovs-ofctl.exe` + } + cmd := []string{ovsOfctlCmd, "dump-flows", defaultBridgeName, "--names"} + antreaPodName, err := data.getAntreaPodOnNode(workerNode) + if err != nil { + t.Fatalf("Error when retrieving the name of the Antrea Pod running on Node '%s': %v", workerNode, err) + } + stdout, stderr, err := data.RunCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, cmd) + if err != nil { + t.Fatalf("error when dumping flows: <%v>, err: <%v>", stderr, err) + } + flows := make([]string, 0) + for _, flow := range strings.Split(stdout, "\n") { + flow = strings.TrimSpace(flow) + if flow == "" { + continue + } + flows = append(flows, flow) + } + t.Logf("Counted %d flow in OVS bridge '%s' for Node '%s'", len(flows), defaultBridgeName, workerNode) + return flows +} + +func (data *TestData) verifyWindowsPodConnectivity(t *testing.T, clientPod PodInfo, useHostNetwork bool) { + winPodName := randName("test-pod-") + winWorkerNode := workerNodeName(clusterInfo.windowsNodes[0]) + winPod := PodInfo{ + Name: winPodName, + Namespace: data.testNamespace, + NodeName: winWorkerNode, + OS: "windows", + } + t.Logf("Creating Windows Pod %s on Node '%s'", winPodName, winWorkerNode) + if err := data.createToolboxPodOnNode(winPod.Name, winPod.Namespace, winPod.NodeName, useHostNetwork); err != nil { + t.Fatalf("Error when creating Pod '%s': %v", winPodName, err) + } + defer deletePodWrapper(t, data, winPod.Namespace, winPod.Name) + + testPodInfos := []PodInfo{clientPod, winPod} + + // Verify Pod connectivity before agent restart + data.runPingMesh(t, testPodInfos, toolboxContainerName, true) + + // Count the OVS flows. + initialOVSFlows := data.dumpOVSFlows(t, winWorkerNode) + + // Restart Antrea agent Pods + err := data.RestartAntreaAgentPods(defaultTimeout) + assert.NoError(t, err) + + // Wait until Agent completes reconcile and OpenFlows replay. + err = wait.PollUntilContextTimeout(context.Background(), 5*time.Second, 1*time.Minute, false, func(ctx context.Context) (done bool, err error) { + newOVSFlows := data.dumpOVSFlows(t, winWorkerNode) + if len(newOVSFlows) != len(initialOVSFlows) { + return false, nil + } + return true, nil + }) + assert.NoErrorf(t, err, "The Openflow entries should be consistent after Antrea agent restarts on Windows Node %s", winWorkerNode) + + // Verify Pod connectivity after agent restart + data.runPingMesh(t, testPodInfos, toolboxContainerName, true) +} + func (data *TestData) testPodConnectivitySameNode(t *testing.T) { numPods := 2 // can be increased podInfos := make([]PodInfo, numPods) @@ -411,24 +509,6 @@ func testOVSFlowReplay(t *testing.T, data *TestData, namespace string) { } t.Logf("The Antrea Pod for Node '%s' is '%s'", workerNode, antreaPodName) - dumpFlows := func() []string { - cmd := []string{"ovs-ofctl", "dump-flows", defaultBridgeName, "--names"} - stdout, stderr, err := data.RunCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, cmd) - if err != nil { - t.Fatalf("error when dumping flows: <%v>, err: <%v>", stderr, err) - } - flows := make([]string, 0) - for _, flow := range strings.Split(stdout, "\n") { - flow = strings.TrimSpace(flow) - if flow == "" { - continue - } - flows = append(flows, flow) - } - count := len(flows) - t.Logf("Counted %d flow in OVS bridge '%s' for Node '%s'", count, defaultBridgeName, workerNode) - return flows - } dumpGroups := func() []string { cmd := []string{"ovs-ofctl", "dump-groups", defaultBridgeName} stdout, stderr, err := data.RunCommandFromPod(antreaNamespace, antreaPodName, ovsContainerName, cmd) @@ -449,7 +529,7 @@ func testOVSFlowReplay(t *testing.T, data *TestData, namespace string) { return groups } - flows1, groups1 := dumpFlows(), dumpGroups() + flows1, groups1 := data.dumpOVSFlows(t, workerNode), dumpGroups() numFlows1, numGroups1 := len(flows1), len(groups1) // This is necessary because "ovs-ctl restart" saves and restores OpenFlow flows for the @@ -486,7 +566,7 @@ func testOVSFlowReplay(t *testing.T, data *TestData, namespace string) { t.Logf("Running second ping mesh to check that flows have been restored") data.runPingMesh(t, podInfos, toolboxContainerName, true) - flows2, groups2 := dumpFlows(), dumpGroups() + flows2, groups2 := data.dumpOVSFlows(t, workerNode), dumpGroups() numFlows2, numGroups2 := len(flows2), len(groups2) if !assert.Equal(t, numFlows1, numFlows2, "Mismatch in OVS flow count after flow replay") { fmt.Println("Flows before replay:")