Skip to content

Commit

Permalink
Address review comments and fix e2e test
Browse files Browse the repository at this point in the history
We also reduce the client wait timeout from 10s to 5s in the NP
controller.

Signed-off-by: Antonin Bas <antonin.bas@broadcom.com>
  • Loading branch information
antoninbas committed May 24, 2024
1 parent 72a52e9 commit 2469992
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 17 deletions.
10 changes: 5 additions & 5 deletions pkg/agent/client/endpoint_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type EndpointResolver struct {

func NewEndpointResolver(kubeClient kubernetes.Interface, namespace, serviceName string, servicePort int32) *EndpointResolver {
key := namespace + "/" + serviceName
controllerName := fmt.Sprintf("ServiceEndpointResolver-%s", key)
controllerName := fmt.Sprintf("ServiceEndpointResolver:%s", key)

// We only need a specific Service and corresponding Endpoints resource, so we create our
// own informer factory, and we filter by namespace and name.
Expand All @@ -106,7 +106,7 @@ func NewEndpointResolver(kubeClient kubernetes.Interface, namespace, serviceName

serviceInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
// FilterFunc ignores all Service events which do not relate to the named Service.
// It should be redudant given the filtering that we already do at the informer level.
// It should be redundant given the filtering that we already do at the informer level.
FilterFunc: func(obj interface{}) bool {
if service, ok := obj.(*corev1.Service); ok {
return service.Namespace == namespace && service.Name == serviceName
Expand Down Expand Up @@ -146,7 +146,7 @@ func NewEndpointResolver(kubeClient kubernetes.Interface, namespace, serviceName
})
endpointsInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
// FilterFunc ignores all Endpoints events which do not relate to the named Service.
// It should be redudant given the filtering that we already do at the informer level.
// It should be redundant given the filtering that we already do at the informer level.
FilterFunc: func(obj interface{}) bool {
// The Endpoints resource for a Service has the same name as the Service.
if endpoints, ok := obj.(*corev1.Endpoints); ok {
Expand Down Expand Up @@ -273,9 +273,9 @@ func (r *EndpointResolver) updateEndpointIfNeeded(endpointURL *url.URL) {
return
}
if endpointURL != nil {
klog.InfoS("Selected Endpoint has changed for Service, notifying listeners", "service", klog.KRef(r.namespace, r.serviceName), "url", endpointURL)
klog.InfoS("Selected a new Endpoint for Service, notifying listeners", "service", klog.KRef(r.namespace, r.serviceName), "url", endpointURL)
} else {
klog.InfoS("No more selected Endpoint for Service, notifying listeners", "service", klog.KRef(r.namespace, r.serviceName))
klog.InfoS("Selected no Endpoint for Service, notifying listeners", "service", klog.KRef(r.namespace, r.serviceName))
}
r.endpointURL.Store(endpointURL)
for _, listener := range r.listeners {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,10 +597,11 @@ func (c *Controller) SetDenyConnStore(denyConnStore *connections.DenyConnectionS
// Run will not return until stopCh is closed.
func (c *Controller) Run(stopCh <-chan struct{}) {
attempts := 0
// If Antrea client is not ready within 10s, we assume that the Antrea Controller is not
// If Antrea client is not ready within 5s, we assume that the Antrea Controller is not
// available. We proceed with our watches, which are likely to fail. In turn, this will
// trigger the fallback mechanism.
ctx, cancel := context.WithTimeout(wait.ContextForChannel(stopCh), 10*time.Second)
// 5s should be more than enough if the Antrea Controller is running correctly.
ctx, cancel := context.WithTimeout(wait.ContextForChannel(stopCh), 5*time.Second)
defer cancel()
if err := wait.PollUntilContextCancel(ctx, 200*time.Millisecond, true, func(ctx context.Context) (bool, error) {
if attempts%10 == 0 {
Expand Down
27 changes: 17 additions & 10 deletions test/e2e/networkpolicy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
Expand Down Expand Up @@ -743,17 +744,17 @@ func testNetworkPolicyAfterAgentRestart(t *testing.T, data *TestData) {
require.NoError(t, err)
t.Cleanup(func() { data.deleteNetworkpolicy(netpol) })

checkFunc := func(testPod string, testPodIPs *PodIPs, expectErr bool) {
checkFunc := func(t assert.TestingT, testPod string, testPodIPs *PodIPs, expectErr bool) {
var wg sync.WaitGroup
checkOne := func(clientPod, serverPod string, serverIP *net.IP) {
defer wg.Done()
if serverIP != nil {
cmd := []string{"wget", "-O", "-", serverIP.String(), "-T", "1"}
_, _, err := data.RunCommandFromPod(data.testNamespace, clientPod, nginxContainerName, cmd)
if expectErr && err == nil {
t.Errorf("Pod %s should not be able to connect %s, but was able to connect", clientPod, serverPod)
} else if !expectErr && err != nil {
t.Errorf("Pod %s should be able to connect %s, but was not able to connect, err: %v", clientPod, serverPod, err)
if expectErr {
assert.Error(t, err, "Pod %s should not be able to connect %s, but was able to connect", clientPod, serverPod)
} else {
assert.NoError(t, err, "Pod %s should be able to connect %s, but was not able to connect", clientPod, serverPod)
}
}
}
Expand Down Expand Up @@ -783,7 +784,7 @@ func testNetworkPolicyAfterAgentRestart(t *testing.T, data *TestData) {

// While the new antrea-agent starts, the denied Pod should never connect to the isolated Pod successfully.
for i := 0; i < 5; i++ {
checkFunc(deniedPod, deniedPodIPs, true)
checkFunc(t, deniedPod, deniedPodIPs, true)
}

antreaPod, err := data.getAntreaPodOnNode(workerNode)
Expand All @@ -792,15 +793,21 @@ func testNetworkPolicyAfterAgentRestart(t *testing.T, data *TestData) {
waitForAgentCondition(t, data, antreaPod, v1beta1.ControllerConnectionUp, corev1.ConditionFalse)
waitForAgentCondition(t, data, antreaPod, v1beta1.OpenflowConnectionUp, corev1.ConditionTrue)
// Even the new antrea-agent can't connect to antrea-controller, the previous policy should continue working.
checkFunc(deniedPod, deniedPodIPs, true)
checkFunc(allowedPod, allowedPodIPs, false)
checkFunc(t, deniedPod, deniedPodIPs, true)
// It may take some time for the antrea-agent to fallback to locally-saved policies. Until
// it happens, allowed traffic may be dropped. So we use polling to tolerate some delay.
// The important part is that traffic that should be denied is always denied, which we have
// already validated at that point.
assert.EventuallyWithT(t, func(t *assert.CollectT) {
checkFunc(t, allowedPod, allowedPodIPs, false)
}, 10*time.Second, 1*time.Second)

// Scale antrea-controller to 1 so antrea-agent will connect to antrea-controller.
scaleFunc(1)
// Make sure antrea-agent connects to antrea-controller.
waitForAgentCondition(t, data, antreaPod, v1beta1.ControllerConnectionUp, corev1.ConditionTrue)
checkFunc(deniedPod, deniedPodIPs, true)
checkFunc(allowedPod, allowedPodIPs, false)
checkFunc(t, deniedPod, deniedPodIPs, true)
checkFunc(t, allowedPod, allowedPodIPs, false)
}

func testIngressPolicyWithoutPortNumber(t *testing.T, data *TestData) {
Expand Down

0 comments on commit 2469992

Please sign in to comment.