Skip to content

Commit

Permalink
Refaktor prefix listening.
Browse files Browse the repository at this point in the history
Signed-off-by: Vitaliy Guschin <vitaliy.guschin@spirent.com>
  • Loading branch information
Vitaliy Guschin committed Mar 21, 2024
1 parent 66aac6d commit 0607bdb
Showing 1 changed file with 35 additions and 31 deletions.
66 changes: 35 additions & 31 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (c *Config) Process() error {
return nil
}

func startListenPrefixes(ctx context.Context, c *Config, tlsClientConfig *tls.Config, serverSubscriptions []chan *ipam.PrefixResponse, clientSubscriptions []chan *ipam.PrefixResponse) {
func startListenPrefixes(ctx context.Context, c *Config, tlsClientConfig *tls.Config, subscriptions []chan *ipam.PrefixResponse) {
for i, prefixServerURL := range c.PrefixServerURL {
go func(i int, prefixServerURL url.URL) {
log.FromContext(ctx).Infof("Start listening prefix server %d: %s", i, prefixServerURL)
Expand Down Expand Up @@ -180,8 +180,7 @@ func startListenPrefixes(ctx context.Context, c *Config, tlsClientConfig *tls.Co
for resp, recvErr := managePrefixClient.Recv(); recvErr == nil; resp, recvErr = managePrefixClient.Recv() {
if !proto.Equal(previousResponse, resp) {
previousResponse = resp
serverSubscriptions[i] <- resp
clientSubscriptions[i] <- resp
subscriptions[i] <- resp
}
}
}
Expand Down Expand Up @@ -369,23 +368,20 @@ func main() {
// ********************************************************************************
log.FromContext(ctx).Infof("executing phase 6: create and register nse with nsm")
// ********************************************************************************

prefixServerCount := len(config.PrefixServerURL)
var serverSubscriptions []chan *ipam.PrefixResponse
var clientSubscriptions []chan *ipam.PrefixResponse
var subscriptions []chan *ipam.PrefixResponse
for i := 0; i < prefixServerCount; i++ {
serverSubscriptions = append(serverSubscriptions, make(chan *ipam.PrefixResponse, 1))
clientSubscriptions = append(clientSubscriptions, make(chan *ipam.PrefixResponse, 1))
subscriptions = append(subscriptions, make(chan *ipam.PrefixResponse, 1))
}
var closeSubscribedChannels = func() {
for i := 0; i < prefixServerCount; i++ {
close(serverSubscriptions[i])
close(clientSubscriptions[i])
close(subscriptions[i])
}
}
startListenPrefixes(ctx, config, tlsClientConfig, serverSubscriptions, clientSubscriptions)
startListenPrefixes(ctx, config, tlsClientConfig, subscriptions)
ipams := extractIPAMList(ctx, subscriptions)

server := createVl3Endpoint(ctx, cancel, config, vppConn, tlsServerConfig, source, loopOptions, vrfOptions, serverSubscriptions)
server := createVl3Endpoint(ctx, cancel, config, vppConn, tlsServerConfig, source, loopOptions, vrfOptions, ipams)

srvErrCh := grpcutils.ListenAndServe(ctx, listenOn, server)
exitOnErr(ctx, cancel, srvErrCh)
Expand Down Expand Up @@ -445,7 +441,7 @@ func main() {
config.dnsServerAddr = conn.GetContext().GetIpContext().GetSrcIPNets()[0].IP
config.dnsServerAddrCh <- conn.GetContext().GetIpContext().GetSrcIPNets()[0].IP

vl3Client := createVl3Client(ctx, config, vppConn, tlsClientConfig, source, loopOptions, vrfOptions, clientSubscriptions, clientAdditionalFunctionality...)
vl3Client := createVl3Client(ctx, config, vppConn, tlsClientConfig, source, loopOptions, vrfOptions, ipams, clientAdditionalFunctionality...)
for _, nse := range nseList {
if nse.Name == config.Name {
continue
Expand Down Expand Up @@ -495,8 +491,22 @@ func main() {
closeSubscribedChannels()
}

func extractIPAMList(ctx context.Context, subscriptions []chan *ipam.PrefixResponse) []*vl3.IPAM {
var ipams []*vl3.IPAM
for _, prefixCh := range subscriptions {
var vl3ipam vl3.IPAM
go func() {
for prefix := range prefixCh {
vl3ipam.Reset(ctx, prefix.Prefix, prefix.ExcludePrefixes)
}
}()
ipams = append(ipams, &vl3ipam)
}
return ipams
}

func createVl3Client(ctx context.Context, config *Config, vppConn vpphelper.Connection, tlsClientConfig *tls.Config, source x509svid.Source,
loopOpts []loopback.Option, vrfOpts []vrf.Option, prefixChs []chan *ipam.PrefixResponse, clientAdditionalFunctionality ...networkservice.NetworkServiceClient) networkservice.NetworkServiceClient {
loopOpts []loopback.Option, vrfOpts []vrf.Option, ipams []*vl3.IPAM, clientAdditionalFunctionality ...networkservice.NetworkServiceClient) networkservice.NetworkServiceClient {
dialOptions := append(tracing.WithTracingDial(),
grpcfd.WithChainStreamInterceptor(),
grpcfd.WithChainUnaryInterceptor(),
Expand All @@ -513,21 +523,14 @@ func createVl3Client(ctx context.Context, config *Config, vppConn vpphelper.Conn
),
)

var clientIpam vl3.IPAM
go func() {
for prefix := range prefixCh {
clientIpam.Reset(ctx, prefix.Prefix, prefix.ExcludePrefixes)
}
}()

c := client.NewClient(
ctx,
client.WithClientURL(&config.ConnectTo),
client.WithName(config.Name),
client.WithAdditionalFunctionality(
append(
clientAdditionalFunctionality,
vl3.NewClient(ctx, &clientIpam),
newMultiIPAMClient(ctx, ipams),
vl3dns.NewClient(config.dnsServerAddr, &config.dnsConfigs),
up.NewClient(ctx, vppConn, up.WithLoadSwIfIndex(loopback.Load)),
ipaddress.NewClient(vppConn, ipaddress.WithLoadSwIfIndex(loopback.Load)),
Expand All @@ -550,15 +553,16 @@ func createVl3Client(ctx context.Context, config *Config, vppConn vpphelper.Conn
return retry.NewClient(c)
}

func createVl3Endpoint(ctx context.Context, cancel context.CancelFunc, config *Config, vppConn vpphelper.Connection, tlsServerConfig *tls.Config,
source x509svid.Source, loopOpts []loopback.Option, vrfOpts []vrf.Option, prefixCh <-chan *ipam.PrefixResponse) *grpc.Server {
var serverIpam vl3.IPAM
go func() {
for prefix := range prefixCh {
serverIpam.Reset(ctx, prefix.Prefix, prefix.ExcludePrefixes)
}
}()
func newMultiIPAMClient(ctx context.Context, ipams []*vl3.IPAM) networkservice.NetworkServiceClient {
var clients []networkservice.NetworkServiceClient
for _, ipam := range ipams {
clients = append(clients, vl3.NewClient(ctx, ipam))
}
return chain.NewNetworkServiceClient(clients...)
}

func createVl3Endpoint(ctx context.Context, cancel context.CancelFunc, config *Config, vppConn vpphelper.Connection, tlsServerConfig *tls.Config,
source x509svid.Source, loopOpts []loopback.Option, vrfOpts []vrf.Option, ipams []*vl3.IPAM) *grpc.Server {
vl3Endpoint := endpoint.NewServer(ctx,
spiffejwt.TokenGeneratorFunc(source, config.MaxTokenLifetime),
endpoint.WithName(config.Name),
Expand All @@ -571,7 +575,7 @@ func createVl3Endpoint(ctx context.Context, cancel context.CancelFunc, config *C
vl3dns.WithConfigs(&config.dnsConfigs),
),
vl3mtu.NewServer(),
strictvl3ipam.NewServer(ctx, vl3.NewServer, &serverIpam),
strictvl3ipam.NewServer(ctx, vl3.NewServer, ipams...),
up.NewServer(ctx, vppConn, up.WithLoadSwIfIndex(loopback.Load)),
ipaddress.NewServer(vppConn, ipaddress.WithLoadSwIfIndex(loopback.Load)),
unnumbered.NewServer(vppConn, loopback.Load),
Expand Down

0 comments on commit 0607bdb

Please sign in to comment.