Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add device plugin support to CNS #2886

Merged
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
8fd4a5b
feat: add device plugin support to CNS
thatmattlong Jul 26, 2024
53b67c8
Merge branch 'master' into matlong/add-device-plugin-skeleton
aggarwal0009 Aug 2, 2024
fea9c08
Add UT coverage and linter fixes
aggarwal0009 Aug 2, 2024
8cdca82
fix windows-latest lint issues
aggarwal0009 Aug 2, 2024
4b95801
Update cns/deviceplugin/pluginmanager.go
aggarwal0009 Aug 2, 2024
fdf1969
remove test run output file
aggarwal0009 Aug 2, 2024
079476b
linter fixes
aggarwal0009 Aug 5, 2024
96669dc
resolve readability related comments
aggarwal0009 Aug 5, 2024
aed17cc
Merge branch 'master' into matlong/add-device-plugin-skeleton
aggarwal0009 Aug 5, 2024
798e2e5
move nolint annotations inline
aggarwal0009 Aug 6, 2024
21a2599
Merge branch 'master' into matlong/add-device-plugin-skeleton
aggarwal0009 Aug 6, 2024
19c14a2
remove unnecessary nolint
aggarwal0009 Aug 6, 2024
788b044
update UT
aggarwal0009 Aug 7, 2024
7e17a60
deleted gitignore for test file
aggarwal0009 Aug 7, 2024
a583154
Merge branch 'master' into matlong/add-device-plugin-skeleton
aggarwal0009 Aug 7, 2024
0618ad3
fix goroutine eak
aggarwal0009 Aug 7, 2024
da78dcb
pr feedback cleanup
aggarwal0009 Aug 12, 2024
fc1ab8f
move devicePrefix to a constant
aggarwal0009 Aug 12, 2024
6c45c8c
pr refactoring
aggarwal0009 Aug 12, 2024
1a0208e
refactored to make PluginManager generic
aggarwal0009 Aug 13, 2024
9cd5e64
Update trackDevice to return nil
aggarwal0009 Aug 14, 2024
a057c17
Add documentation
aggarwal0009 Aug 14, 2024
e199d5d
fix shutdownCh initialization in server.go
aggarwal0009 Aug 14, 2024
d029fdf
Fix UTs
aggarwal0009 Aug 16, 2024
2d02474
Merge branch 'master' into matlong/add-device-plugin-skeleton
aggarwal0009 Aug 19, 2024
7aafd6b
Merge branch 'master' into matlong/add-device-plugin-skeleton
aggarwal0009 Aug 28, 2024
f355f2d
fix merge conflict errors
aggarwal0009 Aug 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 184 additions & 0 deletions cns/deviceplugin/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package deviceplugin

import (
"context"
"fmt"
"net"
"os"
"path"
"path/filepath"
"strings"
"sync"
"time"

"github.com/Azure/azure-container-networking/crd/multitenancy/api/v1alpha1"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
)

type Plugin struct {
Logger *zap.Logger
ResourceName string
SocketWatcher *SocketWatcher
Socket string
deviceCountMutex sync.Mutex
deviceCount int
deviceType v1alpha1.DeviceType
kubeletSocket string
deviceCheckInterval time.Duration
devicePluginDirectory string
}

func NewPlugin(l *zap.Logger, resourceName string, socketWatcher *SocketWatcher, pluginDir string,
initialDeviceCount int, deviceType v1alpha1.DeviceType, kubeletSocket string, deviceCheckInterval time.Duration,
) *Plugin {
return &Plugin{
Logger: l.With(zap.String("resourceName", resourceName)),
ResourceName: resourceName,
SocketWatcher: socketWatcher,
Socket: getSocketName(pluginDir, deviceType),
deviceCount: initialDeviceCount,
deviceType: deviceType,
kubeletSocket: kubeletSocket,
deviceCheckInterval: deviceCheckInterval,
devicePluginDirectory: pluginDir,
}
}

// Run runs the plugin until the context is cancelled, restarting the server as needed
func (p *Plugin) Run(ctx context.Context) {
defer p.mustCleanUp()
for {
select {
case <-ctx.Done():
return
default:
p.Logger.Info("starting device plugin for resource", zap.String("resource", p.ResourceName))
if err := p.run(ctx); err != nil {
p.Logger.Error("device plugin for resource exited", zap.String("resource", p.ResourceName), zap.Error(err))
}
}
}
}

// Here we start the gRPC server and wait for it to be ready
// Once the server is ready, device plugin registers with the Kubelet
// so that it can start serving the kubelet requests
func (p *Plugin) run(ctx context.Context) error {
childCtx, cancel := context.WithCancel(ctx)
defer cancel()

s := NewServer(p.Logger, p.Socket, p, p.deviceCheckInterval)
// Run starts the grpc server and blocks until an error or context is cancelled
runErrChan := make(chan error, 2) //nolint:gomnd // disabled in favor of readability
go func(errChan chan error) {
if err := s.Run(childCtx); err != nil {
errChan <- err
}
}(runErrChan)

// Wait till the server is ready before registering with kubelet
// This call is not blocking and returns as soon as the server is ready
readyErrChan := make(chan error, 2) //nolint:gomnd // disabled in favor of readability
go func(errChan chan error) {
errChan <- s.Ready(childCtx)
}(readyErrChan)

select {
case err := <-runErrChan:
return errors.Wrap(err, "error starting grpc server")
case err := <-readyErrChan:
if err != nil {
return errors.Wrap(err, "error waiting on grpc server to be ready")
}
case <-ctx.Done():
return nil
}

p.Logger.Info("registering with kubelet")
// register with kubelet
if err := p.registerWithKubelet(childCtx); err != nil {
return errors.Wrap(err, "failed to register with kubelet")
}

// run until the socket goes away or the context is cancelled
<-p.SocketWatcher.WatchSocket(childCtx, p.Socket)
return nil
}

func (p *Plugin) registerWithKubelet(ctx context.Context) error {
conn, err := grpc.Dial(p.kubeletSocket, grpc.WithTransportCredentials(insecure.NewCredentials()), //nolint:staticcheck // TODO: Move to grpc.NewClient method
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
d := &net.Dialer{}
conn, err := d.DialContext(ctx, "unix", addr)
if err != nil {
return nil, errors.Wrap(err, "failed to dial context")
}
return conn, nil
}))
if err != nil {
return errors.Wrap(err, "error connecting to kubelet")
}
defer conn.Close()

client := v1beta1.NewRegistrationClient(conn)
request := &v1beta1.RegisterRequest{
Version: v1beta1.Version,
Endpoint: filepath.Base(p.Socket),
ResourceName: p.ResourceName,
}
if _, err = client.Register(ctx, request); err != nil {
return errors.Wrap(err, "error sending request to register with kubelet")
}
return nil
}

func (p *Plugin) mustCleanUp() {
p.Logger.Info("cleaning up device plugin")
if err := os.Remove(p.Socket); err != nil && !os.IsNotExist(err) {
p.Logger.Panic("failed to remove socket", zap.Error(err))
}
}

func (p *Plugin) CleanOldState() error {
entries, err := os.ReadDir(p.devicePluginDirectory)
if err != nil {
return errors.Wrap(err, "error listing existing device plugin sockets")
}
for _, entry := range entries {
if strings.HasPrefix(entry.Name(), path.Base(getSocketPrefix(p.devicePluginDirectory, p.deviceType))) {
// try to delete it
f := path.Join(p.devicePluginDirectory, entry.Name())
if err := os.Remove(f); err != nil {
return errors.Wrapf(err, "error removing old socket %q", f)
}
}
}
return nil
}

func (p *Plugin) UpdateDeviceCount(count int) {
p.deviceCountMutex.Lock()
p.deviceCount = count
p.deviceCountMutex.Unlock()
}

func (p *Plugin) getDeviceCount() int {
p.deviceCountMutex.Lock()
defer p.deviceCountMutex.Unlock()
return p.deviceCount
}

// getSocketPrefix returns a fully qualified path prefix for a given device type. For example, if the device plugin directory is
// /home/foo and the device type is acn.azure.com/vnet-nic, this function returns /home/foo/acn.azure.com_vnet-nic
func getSocketPrefix(devicePluginDirectory string, deviceType v1alpha1.DeviceType) string {
sanitizedDeviceName := strings.ReplaceAll(string(deviceType), "/", "_")
return path.Join(devicePluginDirectory, sanitizedDeviceName)
}

func getSocketName(devicePluginDirectory string, deviceType v1alpha1.DeviceType) string {
return fmt.Sprintf("%s-%d.sock", getSocketPrefix(devicePluginDirectory, deviceType), time.Now().Unix())
}
113 changes: 113 additions & 0 deletions cns/deviceplugin/pluginmanager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package deviceplugin

import (
"context"
"sync"
"time"

"github.com/Azure/azure-container-networking/crd/multitenancy/api/v1alpha1"
"github.com/pkg/errors"
"go.uber.org/zap"
"k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
)

const (
defaultDevicePluginDirectory = "/var/lib/kubelet/device-plugins"
defaultDeviceCheckInterval = 5 * time.Second
)

type pluginManagerOptions struct {
devicePluginDirectory string
kubeletSocket string
deviceCheckInterval time.Duration
}

type pluginManagerOption func(*pluginManagerOptions)

func PluginManagerSocketPrefix(prefix string) func(*pluginManagerOptions) {
return func(opts *pluginManagerOptions) {
opts.devicePluginDirectory = prefix
}
}

func PluginManagerKubeletSocket(socket string) func(*pluginManagerOptions) {
return func(opts *pluginManagerOptions) {
opts.kubeletSocket = socket
}
}

func PluginDeviceCheckInterval(i time.Duration) func(*pluginManagerOptions) {
return func(opts *pluginManagerOptions) {
opts.deviceCheckInterval = i
}
}

// PluginManager runs device plugins for vnet nics and ib nics
type PluginManager struct {
rbtr marked this conversation as resolved.
Show resolved Hide resolved
Logger *zap.Logger
plugins []*Plugin
socketWatcher *SocketWatcher
options pluginManagerOptions
mu sync.Mutex
}

func NewPluginManager(l *zap.Logger, opts ...pluginManagerOption) *PluginManager {
logger := l.With(zap.String("component", "devicePlugin"))
socketWatcher := NewSocketWatcher(logger)
options := pluginManagerOptions{
devicePluginDirectory: defaultDevicePluginDirectory,
kubeletSocket: v1beta1.KubeletSocket,
deviceCheckInterval: defaultDeviceCheckInterval,
}
for _, o := range opts {
o(&options)
}
return &PluginManager{
Logger: logger,
socketWatcher: socketWatcher,
options: options,
}
}

func (pm *PluginManager) AddPlugin(deviceType v1alpha1.DeviceType, deviceCount int) *PluginManager {
pm.mu.Lock()
defer pm.mu.Unlock()
p := NewPlugin(pm.Logger, string(deviceType), pm.socketWatcher,
pm.options.devicePluginDirectory, deviceCount, deviceType, pm.options.kubeletSocket, pm.options.deviceCheckInterval)
pm.plugins = append(pm.plugins, p)
return pm
}

// Run runs the plugin manager until the context is cancelled or error encountered
func (pm *PluginManager) Run(ctx context.Context) error {
// clean up any leftover state from previous failed plugins
// this can happen if the process crashes before it is able to clean up after itself
for _, plugin := range pm.plugins {
if err := plugin.CleanOldState(); err != nil {
return errors.Wrap(err, "error cleaning state from previous plugin process")
}
}

var wg sync.WaitGroup
for _, plugin := range pm.plugins {
wg.Add(1) //nolint:gomnd // in favor of readability
go func(p *Plugin) {
defer wg.Done()
p.Run(ctx)
}(plugin)
}

wg.Wait()
return nil
}

func (pm *PluginManager) TrackDevices(deviceType v1alpha1.DeviceType, count int) {
pm.mu.Lock()
defer pm.mu.Unlock()
for _, plugin := range pm.plugins {
if plugin.deviceType == deviceType {
plugin.UpdateDeviceCount(count)
break
rbtr marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Loading
Loading