Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Revert "Handle updates to the node OVN transit switch IP"" #3193

Merged
merged 1 commit into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions pkg/routeagent_driver/handlers/ovn/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ import (
type NewOVSDBClientFn func(_ model.ClientDBModel, _ ...libovsdbclient.Option) (libovsdbclient.Client, error)

type HandlerConfig struct {
Namespace string
ClusterCIDR []string
ServiceCIDR []string
SubmClient clientset.Interface
K8sClient kubernetes.Interface
DynClient dynamic.Interface
WatcherConfig *watcher.Config
NewOVSDBClient NewOVSDBClientFn
Namespace string
ClusterCIDR []string
ServiceCIDR []string
SubmClient clientset.Interface
K8sClient kubernetes.Interface
DynClient dynamic.Interface
WatcherConfig *watcher.Config
NewOVSDBClient NewOVSDBClientFn
TransitSwitchIP TransitSwitchIPGetter
}

type Handler struct {
Expand Down Expand Up @@ -124,7 +125,7 @@ func (ovn *Handler) Init() error {
return err
}

nonGatewayRouteController, err := NewNonGatewayRouteController(*ovn.WatcherConfig, connectionHandler, ovn.K8sClient, ovn.Namespace)
nonGatewayRouteController, err := NewNonGatewayRouteController(*ovn.WatcherConfig, connectionHandler, ovn.Namespace, ovn.TransitSwitchIP)
if err != nil {
return err
}
Expand Down
97 changes: 78 additions & 19 deletions pkg/routeagent_driver/handlers/ovn/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ const (
var _ = Describe("Handler", func() {
t := newTestDriver()

var ovsdbClient *fakeovn.OVSDBClient
var (
ovsdbClient *fakeovn.OVSDBClient
transitSwitchIP ovn.TransitSwitchIP
)

BeforeEach(func() {
ovsdbClient = fakeovn.NewOVSDBClient()
Expand Down Expand Up @@ -79,6 +82,8 @@ var _ = Describe("Handler", func() {

restMapper := test.GetRESTMapperFor(&submarinerv1.GatewayRoute{}, &submarinerv1.NonGatewayRoute{})

transitSwitchIP = ovn.NewTransitSwitchIP()

t.Start(ovn.NewHandler(&ovn.HandlerConfig{
Namespace: testing.Namespace,
ClusterCIDR: []string{clusterCIDR},
Expand All @@ -93,9 +98,11 @@ var _ = Describe("Handler", func() {
NewOVSDBClient: func(_ model.ClientDBModel, _ ...libovsdbclient.Option) (libovsdbclient.Client, error) {
return ovsdbClient, nil
},
TransitSwitchIP: transitSwitchIP,
}))

Expect(ovsdbClient.Connected()).To(BeTrue())
Expect(transitSwitchIP.Init(t.k8sClient)).To(Succeed())
})

When("a remote Endpoint is created, updated, and deleted", func() {
Expand Down Expand Up @@ -254,37 +261,89 @@ var _ = Describe("Handler", func() {
})
})

When("a NonGatewayRoute is created and deleted", func() {
When("NonGatewayRoutes are created, updated and deleted", func() {
verifyLogicalRouterPolicies := func(ngr *submarinerv1.NonGatewayRoute, nextHop string) {
for _, cidr := range ngr.RoutePolicySpec.RemoteCIDRs {
ovsdbClient.AwaitModel(&nbdb.LogicalRouterPolicy{
Match: cidr,
Nexthop: ptr.To(nextHop),
})
}
}

verifyNoLogicalRouterPolicies := func(ngr *submarinerv1.NonGatewayRoute, nextHop string) {
for _, cidr := range ngr.RoutePolicySpec.RemoteCIDRs {
ovsdbClient.AwaitNoModel(&nbdb.LogicalRouterPolicy{
Match: cidr,
Nexthop: ptr.To(nextHop),
})
}
}

It("should correctly reconcile OVN router policies", func() {
client := t.dynClient.Resource(submarinerv1.SchemeGroupVersion.WithResource("nongatewayroutes")).Namespace(testing.Namespace)

nonGWRoute := &submarinerv1.NonGatewayRoute{
By("Creating first NonGatewayRoute")

nextHop := "172.1.1.1"

nonGWRoute1 := &submarinerv1.NonGatewayRoute{
ObjectMeta: metav1.ObjectMeta{
Name: "test-nongateway-route",
Name: "test-nongateway-route1",
},
RoutePolicySpec: submarinerv1.RoutePolicySpec{
NextHops: []string{"111.1.1.1"},
RemoteCIDRs: []string{"192.0.1.0/24", "192.0.2.0/24"},
NextHops: []string{nextHop},
RemoteCIDRs: []string{"111.0.1.0/24", "111.0.2.0/24"},
},
}

test.CreateResource(client, nonGWRoute)
test.CreateResource(client, nonGWRoute1)

for _, cidr := range nonGWRoute.RoutePolicySpec.RemoteCIDRs {
ovsdbClient.AwaitModel(&nbdb.LogicalRouterPolicy{
Match: cidr,
Nexthop: ptr.To(nonGWRoute.RoutePolicySpec.NextHops[0]),
})
}
verifyLogicalRouterPolicies(nonGWRoute1, nextHop)

Expect(client.Delete(context.Background(), nonGWRoute.Name, metav1.DeleteOptions{})).To(Succeed())
By("Creating second NonGatewayRoute")

for _, cidr := range nonGWRoute.RoutePolicySpec.RemoteCIDRs {
ovsdbClient.AwaitNoModel(&nbdb.LogicalRouterPolicy{
Match: cidr,
Nexthop: ptr.To(nonGWRoute.RoutePolicySpec.NextHops[0]),
})
nonGWRoute2 := &submarinerv1.NonGatewayRoute{
ObjectMeta: metav1.ObjectMeta{
Name: "test-nongateway-route2",
},
RoutePolicySpec: submarinerv1.RoutePolicySpec{
NextHops: []string{nextHop},
RemoteCIDRs: []string{"222.0.1.0/24", "222.0.2.0/24"},
},
}

test.CreateResource(client, nonGWRoute2)

verifyLogicalRouterPolicies(nonGWRoute1, nextHop)
verifyLogicalRouterPolicies(nonGWRoute2, nextHop)

By("Updating NextHop for first NonGatewayRoute")

prevNextHop := nextHop
nextHop = "172.1.1.2"
nonGWRoute1.RoutePolicySpec.NextHops[0] = nextHop

test.UpdateResource(client, nonGWRoute1)

verifyLogicalRouterPolicies(nonGWRoute1, nextHop)
verifyNoLogicalRouterPolicies(nonGWRoute1, prevNextHop)
verifyNoLogicalRouterPolicies(nonGWRoute2, prevNextHop)

By("Updating NextHop for second NonGatewayRoute")

nonGWRoute2.RoutePolicySpec.NextHops[0] = nextHop

test.UpdateResource(client, nonGWRoute2)

verifyLogicalRouterPolicies(nonGWRoute1, nextHop)
verifyLogicalRouterPolicies(nonGWRoute2, nextHop)

By("Deleting first NonGatewayRoute")

Expect(client.Delete(context.Background(), nonGWRoute1.Name, metav1.DeleteOptions{})).To(Succeed())

verifyNoLogicalRouterPolicies(nonGWRoute1, nextHop)
})
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,21 @@ import (
"github.com/pkg/errors"
"github.com/submariner-io/admiral/pkg/watcher"
submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
nodeutil "github.com/submariner-io/submariner/pkg/node"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
)

type NonGatewayRouteController struct {
nonGatewayRouteWatcher watcher.Interface
connectionHandler *ConnectionHandler
remoteSubnets sets.Set[string]
stopCh chan struct{}
transitSwitchIP string
transitSwitchIP TransitSwitchIPGetter
}

//nolint:gocritic // Ignore hugeParam
func NewNonGatewayRouteController(config watcher.Config, connectionHandler *ConnectionHandler,
k8sClientSet clientset.Interface, namespace string,
namespace string, transitSwitchIP TransitSwitchIPGetter,
) (*NonGatewayRouteController, error) {
// We'll panic if config is nil, this is intentional
var err error
Expand All @@ -47,6 +45,7 @@ func NewNonGatewayRouteController(config watcher.Config, connectionHandler *Conn
connectionHandler: connectionHandler,
remoteSubnets: sets.New[string](),
stopCh: make(chan struct{}),
transitSwitchIP: transitSwitchIP,
}

config.ResourceConfigs = []watcher.ResourceConfig{
Expand All @@ -62,25 +61,6 @@ func NewNonGatewayRouteController(config watcher.Config, connectionHandler *Conn
},
}

node, err := nodeutil.GetLocalNode(k8sClientSet)
if err != nil {
return nil, errors.Wrap(err, "error getting the local node info")
}

annotations := node.GetAnnotations()

transitSwitchIP := annotations["k8s.ovn.org/node-transit-switch-port-ifaddr"]
if transitSwitchIP == "" {
// This is a non-IC setup , so this controller will not be started.
logger.Infof("No transit switch IP configured on node %q", node.Name)
return controller, nil
}

controller.transitSwitchIP, err = jsonToIP(transitSwitchIP)
if err != nil {
return nil, errors.Wrapf(err, "error parsing transit switch IP")
}

controller.nonGatewayRouteWatcher, err = watcher.New(&config)
if err != nil {
return nil, errors.Wrap(err, "error creating resource watcher")
Expand Down Expand Up @@ -129,7 +109,7 @@ func (g *NonGatewayRouteController) reconcileRemoteSubnets(submNonGWRoute *subma
}

// If this node belongs to same zone as gateway node, ignore the event.
if submNonGWRoute.RoutePolicySpec.NextHops[0] != g.transitSwitchIP {
if submNonGWRoute.RoutePolicySpec.NextHops[0] != g.transitSwitchIP.Get() {
for _, subnet := range submNonGWRoute.RoutePolicySpec.RemoteCIDRs {
if addSubnet {
g.remoteSubnets.Insert(subnet)
Expand All @@ -145,7 +125,5 @@ func (g *NonGatewayRouteController) reconcileRemoteSubnets(submNonGWRoute *subma
}

func (g *NonGatewayRouteController) stop() {
if g.transitSwitchIP != "" {
close(g.stopCh)
}
close(g.stopCh)
}
77 changes: 47 additions & 30 deletions pkg/routeagent_driver/handlers/ovn/non_gateway_route_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,47 +28,32 @@ import (
submarinerClientset "github.com/submariner-io/submariner/pkg/client/clientset/versioned"
"github.com/submariner-io/submariner/pkg/cni"
"github.com/submariner-io/submariner/pkg/event"
nodeutil "github.com/submariner-io/submariner/pkg/node"
"github.com/submariner-io/submariner/pkg/routeagent_driver/constants"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes"
)

type NonGatewayRouteHandler struct {
event.HandlerBase
event.NodeHandlerBase
smClient submarinerClientset.Interface
k8sClient clientset.Interface
transitSwitchIP string
k8sClient kubernetes.Interface
transitSwitchIP TransitSwitchIP
}

func NewNonGatewayRouteHandler(smClient submarinerClientset.Interface, k8sClient clientset.Interface) *NonGatewayRouteHandler {
func NewNonGatewayRouteHandler(smClient submarinerClientset.Interface, k8sClient kubernetes.Interface, transitSwitchIP TransitSwitchIP,
) *NonGatewayRouteHandler {
return &NonGatewayRouteHandler{
smClient: smClient,
k8sClient: k8sClient,
smClient: smClient,
k8sClient: k8sClient,
transitSwitchIP: transitSwitchIP,
}
}

func (h *NonGatewayRouteHandler) Init() error {
logger.Info("Starting NonGatewayRouteHandler")

node, err := nodeutil.GetLocalNode(h.k8sClient)
if err != nil {
return errors.Wrap(err, "error getting the g/w node")
}

annotations := node.GetAnnotations()

// TODO transitSwitchIP changes support needs to be added.
transitSwitchIP, ok := annotations[constants.OvnTransitSwitchIPAnnotation]
if !ok {
logger.Infof("No transit switch IP configured")
return nil
}

h.transitSwitchIP, err = jsonToIP(transitSwitchIP)

return errors.Wrapf(err, "error parsing the transit switch IP")
return errors.Wrap(h.transitSwitchIP.Init(h.k8sClient), "error initializing TransitSwitchIP")
}

func (h *NonGatewayRouteHandler) GetName() string {
Expand All @@ -80,7 +65,7 @@ func (h *NonGatewayRouteHandler) GetNetworkPlugins() []string {
}

func (h *NonGatewayRouteHandler) RemoteEndpointCreated(endpoint *submarinerv1.Endpoint) error {
if !h.State().IsOnGateway() || h.transitSwitchIP == "" {
if !h.State().IsOnGateway() || h.transitSwitchIP.Get() == "" {
return nil
}

Expand All @@ -98,7 +83,7 @@ func (h *NonGatewayRouteHandler) RemoteEndpointCreated(endpoint *submarinerv1.En
}

func (h *NonGatewayRouteHandler) RemoteEndpointRemoved(endpoint *submarinerv1.Endpoint) error {
if !h.State().IsOnGateway() || h.transitSwitchIP == "" {
if !h.State().IsOnGateway() || h.transitSwitchIP.Get() == "" {
return nil
}

Expand All @@ -113,7 +98,7 @@ func (h *NonGatewayRouteHandler) RemoteEndpointRemoved(endpoint *submarinerv1.En
}

func (h *NonGatewayRouteHandler) TransitionToGateway() error {
if h.transitSwitchIP == "" {
if h.transitSwitchIP.Get() == "" {
return nil
}

Expand Down Expand Up @@ -141,6 +126,38 @@ func (h *NonGatewayRouteHandler) TransitionToGateway() error {
return nil
}

func (h *NonGatewayRouteHandler) NodeUpdated(node *corev1.Node) error {
updated, err := h.transitSwitchIP.UpdateFrom(node)
if err != nil {
logger.Errorf(err, "Error updating transit switch IP from node: %s", resource.ToJSON(node))
return nil
}

if !updated {
return nil
}

logger.Infof("Transit switch IP updated to %s", h.transitSwitchIP.Get())

if !h.State().IsOnGateway() {
return nil
}

endpoints := h.State().GetRemoteEndpoints()
for i := range endpoints {
err = util.Update(context.TODO(), NonGatewayResourceInterface(h.smClient, endpoints[i].Namespace),
h.newNonGatewayRoute(&endpoints[i]), func(existing *submarinerv1.NonGatewayRoute) (*submarinerv1.NonGatewayRoute, error) {
existing.RoutePolicySpec.NextHops = []string{h.transitSwitchIP.Get()}
return existing, nil
})
if err != nil {
return errors.Wrapf(err, "error updating NonGatewayRoute")
}
}

return nil
}

func (h *NonGatewayRouteHandler) newNonGatewayRoute(endpoint *submarinerv1.Endpoint) *submarinerv1.NonGatewayRoute {
return &submarinerv1.NonGatewayRoute{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -149,7 +166,7 @@ func (h *NonGatewayRouteHandler) newNonGatewayRoute(endpoint *submarinerv1.Endpo
},
RoutePolicySpec: submarinerv1.RoutePolicySpec{
RemoteCIDRs: endpoint.Spec.Subnets,
NextHops: []string{h.transitSwitchIP},
NextHops: []string{h.transitSwitchIP.Get()},
},
}
}
Expand Down
Loading
Loading