Skip to content

Commit

Permalink
Use HandlerState in the OVN Handler
Browse files Browse the repository at this point in the history
The mutex was kept to provide atomicity when updating the gateway and
host network dataplane since these are also invoked via the route config
syncer which runs on its own thread. The synchronization may actually not
be necessary but it's better to be safe and the overhead in negligible.

Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
  • Loading branch information
tpantelis committed Nov 15, 2023
1 parent 0f26727 commit 3013f75
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 46 deletions.
3 changes: 3 additions & 0 deletions pkg/routeagent_driver/handlers/ovn/gateway_dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ func (ovn *Handler) cleanupGatewayDataplane() error {
}

func (ovn *Handler) updateGatewayDataplane() error {
ovn.mutex.Lock()
defer ovn.mutex.Unlock()

currentRuleRemotes, err := ovn.getExistingIPv4RuleSubnets()
if err != nil {
return errors.Wrapf(err, "error reading ip rule list for IPv4")
Expand Down
54 changes: 13 additions & 41 deletions pkg/routeagent_driver/handlers/ovn/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ type Handler struct {
HandlerConfig
mutex sync.Mutex
cableRoutingInterface *net.Interface
remoteEndpoints map[string]*submV1.Endpoint
isGateway bool
netLink netlink.Interface
ipt iptables.Interface
gatewayRouteController *GatewayRouteController
Expand All @@ -77,11 +75,10 @@ func NewHandler(config *HandlerConfig) *Handler {
}

h := &Handler{
HandlerConfig: *config,
remoteEndpoints: map[string]*submV1.Endpoint{},
netLink: netlink.New(),
ipt: ipt,
stopCh: make(chan struct{}),
HandlerConfig: *config,
netLink: netlink.New(),
ipt: ipt,
stopCh: make(chan struct{}),
}

if h.NewOVSDBClient == nil {
Expand Down Expand Up @@ -153,9 +150,6 @@ func (ovn *Handler) LocalEndpointCreated(endpoint *submV1.Endpoint) error {
}
}

ovn.mutex.Lock()
defer ovn.mutex.Unlock()

ovn.cableRoutingInterface = routingInterface

return nil
Expand All @@ -169,17 +163,12 @@ func (ovn *Handler) RemoteEndpointCreated(endpoint *submV1.Endpoint) error {
return nil
}

ovn.mutex.Lock()
defer ovn.mutex.Unlock()

ovn.remoteEndpoints[endpoint.Name] = endpoint

err := ovn.updateHostNetworkDataplane()
if err != nil {
return errors.Wrapf(err, "updateHostNetworkDataplane returned error")
}

if ovn.isGateway {
if ovn.State().IsOnGateway() {
for _, subnet := range endpoint.Spec.Subnets {
if err = ovn.addNoMasqueradeIPTables(subnet); err != nil {
return errors.Wrapf(err, "error adding no-masquerade rules for subnet %q", subnet)
Expand All @@ -199,35 +188,25 @@ func (ovn *Handler) RemoteEndpointUpdated(endpoint *submV1.Endpoint) error {
return nil
}

ovn.mutex.Lock()
defer ovn.mutex.Unlock()

ovn.remoteEndpoints[endpoint.Name] = endpoint

err := ovn.updateHostNetworkDataplane()
if err != nil {
return errors.Wrapf(err, "updateHostNetworkDataplane returned error")
}

if ovn.isGateway {
if ovn.State().IsOnGateway() {
return ovn.updateGatewayDataplane()
}

return nil
}

func (ovn *Handler) RemoteEndpointRemoved(endpoint *submV1.Endpoint) error {
ovn.mutex.Lock()
defer ovn.mutex.Unlock()

delete(ovn.remoteEndpoints, endpoint.Name)

err := ovn.updateHostNetworkDataplane()
if err != nil {
return errors.Wrapf(err, "updateHostNetworkDataplane returned error")
}

if ovn.isGateway {
if ovn.State().IsOnGateway() {
for _, subnet := range endpoint.Spec.Subnets {
if err = ovn.removeNoMasqueradeIPTables(subnet); err != nil {
return errors.Wrapf(err, "error removing no-masquerade rules for subnet %q", subnet)
Expand All @@ -241,12 +220,9 @@ func (ovn *Handler) RemoteEndpointRemoved(endpoint *submV1.Endpoint) error {
}

func (ovn *Handler) TransitionToNonGateway() error {
ovn.mutex.Lock()
defer ovn.mutex.Unlock()

ovn.isGateway = false
for _, endpoint := range ovn.remoteEndpoints {
for _, subnet := range endpoint.Spec.Subnets {
endpoints := ovn.State().GetRemoteEndpoints()
for i := range endpoints {
for _, subnet := range endpoints[i].Spec.Subnets {
if err := ovn.removeNoMasqueradeIPTables(subnet); err != nil {
return errors.Wrapf(err, "error removing no-masquerade rules for subnet %q", subnet)
}
Expand All @@ -257,13 +233,9 @@ func (ovn *Handler) TransitionToNonGateway() error {
}

func (ovn *Handler) TransitionToGateway() error {
ovn.mutex.Lock()
defer ovn.mutex.Unlock()

ovn.isGateway = true

for _, endpoint := range ovn.remoteEndpoints {
for _, subnet := range endpoint.Spec.Subnets {
endpoints := ovn.State().GetRemoteEndpoints()
for i := range endpoints {
for _, subnet := range endpoints[i].Spec.Subnets {
if err := ovn.addNoMasqueradeIPTables(subnet); err != nil {
return errors.Wrapf(err, "error adding no-masquerade rules for subnet %q", subnet)
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/routeagent_driver/handlers/ovn/host_networking.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ const (
)

func (ovn *Handler) updateHostNetworkDataplane() error {
ovn.mutex.Lock()
defer ovn.mutex.Unlock()

currentRuleRemotes, err := ovn.getExistingIPv4HostNetworkRoutes()
if err != nil {
return errors.Wrapf(err, "error reading ip rule list for IPv4")
Expand Down
4 changes: 1 addition & 3 deletions pkg/routeagent_driver/handlers/ovn/route_config_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,7 @@ func (ovn *Handler) handleInterfaceAddressChange() error {
logger.Infof("Waiting for interface %q to be ready: %v", OVNK8sMgmntIntfName, err)
return true
}, func() error {
ovn.mutex.Lock()
defer ovn.mutex.Unlock()
if ovn.isGateway {
if ovn.State().IsOnGateway() {
err = ovn.updateGatewayDataplane()
if err != nil {
return errors.Wrap(err, "error syncing gateway routes")
Expand Down
5 changes: 3 additions & 2 deletions pkg/routeagent_driver/handlers/ovn/subnets.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import "k8s.io/utils/set"
func (ovn *Handler) getRemoteSubnets() set.Set[string] {
endpointSubnets := set.New[string]()

for _, endpoint := range ovn.remoteEndpoints {
for _, subnet := range endpoint.Spec.Subnets {
endpoints := ovn.State().GetRemoteEndpoints()
for i := range endpoints {
for _, subnet := range endpoints[i].Spec.Subnets {
endpointSubnets.Insert(subnet)
}
}
Expand Down

0 comments on commit 3013f75

Please sign in to comment.