Skip to content

Commit

Permalink
Merge pull request #178 from Nordix/issue/164
Browse files Browse the repository at this point in the history
NSM connection (VIPs and routes) update in TAPA
  • Loading branch information
LionelJouin authored Apr 12, 2022
2 parents ff6f220 + 99b2dce commit 59fc5ad
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 127 deletions.
3 changes: 0 additions & 3 deletions examples/target/helm/templates/target.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,6 @@ spec:
- name: meridio-socket
mountPath: /var/lib/meridio
readOnly: false
securityContext:
capabilities:
add: ["NET_ADMIN"]
volumes:
- name: spire-agent-socket
hostPath:
Expand Down
127 changes: 77 additions & 50 deletions pkg/ambassador/tap/conduit/conduit.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import (
"context"
"errors"
"fmt"
"net"
"strings"
"sync"
"time"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/cls"
Expand All @@ -32,6 +35,7 @@ import (
"github.com/nordix/meridio/pkg/ambassador/tap/types"
"github.com/nordix/meridio/pkg/conduit"
"github.com/nordix/meridio/pkg/networking"
"github.com/nordix/meridio/pkg/retry"
"github.com/sirupsen/logrus"
)

Expand All @@ -53,8 +57,9 @@ type Conduit struct {
StreamFactory StreamFactory
connection *networkservice.Connection
mu sync.Mutex
vips []*virtualIP
tableID int
localIPs []string
ctx context.Context
cancel context.CancelFunc
}

// New is the constructor of Conduit.
Expand All @@ -76,8 +81,7 @@ func New(conduit *ambassadorAPI.Conduit,
NetworkServiceClient: networkServiceClient,
NetUtils: netUtils,
connection: nil,
vips: []*virtualIP{},
tableID: 1,
localIPs: []string{},
}
c.StreamFactory = stream.NewFactory(targetRegistryClient, stream.MaxNumberOfTargets, c)
c.StreamManager = NewStreamManager(configurationManagerClient, targetRegistryClient, streamRegistry, c.StreamFactory, PendingTime)
Expand All @@ -93,9 +97,10 @@ func (c *Conduit) Connect(ctx context.Context) error {
if c.isConnected() {
return nil
}
c.ctx, c.cancel = context.WithCancel(ctx)
logrus.Infof("Attempt to connect conduit: %v", c.Conduit)
nsName := conduit.GetNetworkServiceNameWithProxy(c.Conduit.GetName(), c.Conduit.GetTrench().GetName(), c.Namespace)
connection, err := c.NetworkServiceClient.Request(ctx,
connection, err := c.NetworkServiceClient.Request(c.ctx,
&networkservice.NetworkServiceRequest{
Connection: &networkservice.Connection{
Id: fmt.Sprintf("%s-%s-%d", c.TargetName, nsName, 0),
Expand All @@ -117,6 +122,7 @@ func (c *Conduit) Connect(ctx context.Context) error {
}
logrus.Infof("Conduit connected: %v", c.Conduit)
c.connection = connection
c.localIPs = c.connection.GetContext().GetIpContext().GetSrcIpAddrs()

c.Configuration.Watch()

Expand All @@ -127,14 +133,14 @@ func (c *Conduit) Connect(ctx context.Context) error {
// Disconnect closes the connection from NSM, closes all streams
// and stop the VIP watcher
func (c *Conduit) Disconnect(ctx context.Context) error {
if c.cancel != nil {
c.cancel()
}
c.mu.Lock()
defer c.mu.Unlock()
logrus.Infof("Disconnect from conduit: %v", c.Conduit)
// Stops the configuration
c.Configuration.Stop()
// reset the VIPs related configuration
c.deleteVIPs(c.vips)
c.tableID = 1
var errFinal error
// Stop the stream manager (close the streams)
errFinal = c.StreamManager.Stop(ctx)
Expand Down Expand Up @@ -171,10 +177,10 @@ func (c *Conduit) GetConduit() *ambassadorAPI.Conduit {
return c.Conduit
}

// GetStreams returns the local IPs for this conduit
// GetIPs returns the local IPs for this conduit
func (c *Conduit) GetIPs() []string {
if c.connection != nil {
return c.connection.GetContext().GetIpContext().GetSrcIpAddrs()
return c.localIPs
}
return []string{}
}
Expand All @@ -186,34 +192,69 @@ func (c *Conduit) SetVIPs(vips []string) error {
if !c.isConnected() {
return nil
}
currentVIPs := make(map[string]*virtualIP)
for _, vip := range c.vips {
currentVIPs[vip.prefix] = vip
// prepare SrcIpAddrs (IPs allocated by the proxy + VIPs)
c.connection.Context.IpContext.SrcIpAddrs = append(c.GetIPs(), vips...)
// prepare the routes (nexthops = proxy bridge IPs)
ipv4Nexthops := []*networkservice.Route{}
ipv6Nexthops := []*networkservice.Route{}
for _, nexthop := range c.getGateways() {
gw, _, err := net.ParseCIDR(nexthop)
if err != nil {
continue
}
route := &networkservice.Route{
NextHop: gw.String(),
}
if isIPv6(nexthop) {
route.Prefix = "::/0"
ipv6Nexthops = append(ipv6Nexthops, route)
} else {
route.Prefix = "0.0.0.0/0"
ipv4Nexthops = append(ipv4Nexthops, route)
}
}
// prepare the policies (only based on VIP address for now)
c.connection.Context.IpContext.Policies = []*networkservice.PolicyRoute{}
for _, vip := range vips {
if _, ok := currentVIPs[vip]; !ok {
newVIP, err := newVirtualIP(vip, c.tableID, c.NetUtils)
if err != nil {
logrus.Errorf("SimpleTarget: Error adding SourceBaseRoute: %v", err) // todo: err handling
continue
}
c.tableID++
c.vips = append(c.vips, newVIP)
for _, nexthop := range c.getGateways() {
err = newVIP.AddNexthop(nexthop)
if err != nil {
logrus.Errorf("Client: Error adding nexthop: %v", err) // todo: err handling
}
}
nexthops := ipv4Nexthops
if isIPv6(vip) {
nexthops = ipv6Nexthops
}
delete(currentVIPs, vip)
}
// delete remaining vips
vipsSlice := []*virtualIP{}
for _, vip := range currentVIPs {
vipsSlice = append(vipsSlice, vip)
newPolicyRoute := &networkservice.PolicyRoute{
From: vip,
Routes: nexthops,
}
c.connection.Context.IpContext.Policies = append(c.connection.Context.IpContext.Policies, newPolicyRoute)
}
c.deleteVIPs(vipsSlice)
var err error
c.ctx, c.cancel = context.WithCancel(context.TODO())
// update the NSM connection
err = retry.Do(func() error {
c.connection, err = c.NetworkServiceClient.Request(c.ctx,
&networkservice.NetworkServiceRequest{
Connection: &networkservice.Connection{
Id: c.connection.GetId(),
NetworkService: c.connection.GetNetworkService(),
Labels: c.connection.GetLabels(),
Payload: c.connection.GetPayload(),
Context: &networkservice.ConnectionContext{
IpContext: c.connection.GetContext().GetIpContext(),
},
},
MechanismPreferences: []*networkservice.Mechanism{
{
Cls: cls.LOCAL,
Type: kernelmech.MECHANISM,
},
},
})
if err != nil {
return fmt.Errorf("error updating the VIPs in conduit: %v - %v", c.Conduit, err)
}
return nil
}, retry.WithContext(c.ctx),
retry.WithDelay(500*time.Millisecond))
logrus.Infof("VIPs in conduit updated: %v - %v", c.Conduit, vips)
return nil
}

Expand All @@ -226,22 +267,8 @@ func (c *Conduit) isConnected() bool {
return c.connection != nil
}

func (c *Conduit) deleteVIPs(vips []*virtualIP) {
vipsMap := make(map[string]*virtualIP)
for _, vip := range vips {
vipsMap[vip.prefix] = vip
}
for index := 0; index < len(c.vips); index++ {
vip := c.vips[index]
if _, ok := vipsMap[vip.prefix]; ok {
c.vips = append(c.vips[:index], c.vips[index+1:]...)
index--
err := vip.Delete()
if err != nil {
logrus.Errorf("Client: Error deleting vip: %v", err) // todo: err handling
}
}
}
func isIPv6(address string) bool {
return strings.Count(address, ":") >= 2
}

// TODO: Requires the IPs of the bridge
Expand Down
74 changes: 0 additions & 74 deletions pkg/ambassador/tap/conduit/vip.go

This file was deleted.

0 comments on commit 59fc5ad

Please sign in to comment.