Skip to content

Commit

Permalink
Merge pull request #167 from nesv/driver.volumes.limit
Browse files Browse the repository at this point in the history
pkg/linode-bs: Do not persist volume attachments across boots
  • Loading branch information
Nick Saika authored May 28, 2024
2 parents 55b73a9 + 3a45f7f commit 06681b2
Show file tree
Hide file tree
Showing 7 changed files with 350 additions and 39 deletions.
84 changes: 70 additions & 14 deletions pkg/linode-bs/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,19 +258,34 @@ func (linodeCS *LinodeControllerServer) ControllerPublishVolume(ctx context.Cont
return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("Volume with id %d already attached to node %d", volumeID, *volume.LinodeID))
}

if _, err := linodeCS.CloudProvider.GetInstance(ctx, linodeID); err != nil {
if apiErr, ok := err.(*linodego.Error); ok && apiErr.Code == 404 {
return nil, status.Error(codes.NotFound, fmt.Sprintf("Linode with id %d not found", linodeID))
}
instance, err := linodeCS.CloudProvider.GetInstance(ctx, linodeID)
if err, ok := err.(*linodego.Error); ok && err.Code == 404 {
return nil, status.Errorf(codes.NotFound, "Linode with id %d not found", linodeID)
} else if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

opts := &linodego.VolumeAttachOptions{
LinodeID: linodeID,
ConfigID: 0,
// Check to see if there is room to attach this volume to the instance.
if canAttach, err := linodeCS.canAttach(ctx, instance); err != nil {
return &csi.ControllerPublishVolumeResponse{}, status.Error(codes.Internal, err.Error())
} else if !canAttach {
limit := linodeCS.maxVolumeAttachments(instance)
return &csi.ControllerPublishVolumeResponse{}, status.Errorf(codes.ResourceExhausted, "max number of volumes (%d) already attached to instance", limit)
}

if _, err := linodeCS.CloudProvider.AttachVolume(ctx, volumeID, opts); err != nil {
// Whether or not the volume attachment should be persisted across
// boots.
//
// Setting this to true will limit the maximum number of attached
// volumes to 8 (eight), minus any instance disks, since volume
// attachments get persisted by adding them to the instance's boot
// config.
persist := false

if _, err := linodeCS.CloudProvider.AttachVolume(ctx, volumeID, &linodego.VolumeAttachOptions{
LinodeID: linodeID,
PersistAcrossBoots: &persist,
}); err != nil {
retCode := codes.Internal
if apiErr, ok := err.(*linodego.Error); ok && strings.Contains(apiErr.Message, "is already attached") {
retCode = codes.Unavailable // Allow a retry if the volume is already attached: race condition can occur here
Expand All @@ -289,6 +304,47 @@ func (linodeCS *LinodeControllerServer) ControllerPublishVolume(ctx context.Cont
return &csi.ControllerPublishVolumeResponse{PublishContext: pvInfo}, nil
}

// canAttach indicates whether or not another volume can be attached to the
// Linode with the given ID.
//
// Whether or not another volume can be attached is based on how many instance
// disks and block storage volumes are currently attached to the Linode
// instance.
func (s *LinodeControllerServer) canAttach(ctx context.Context, instance *linodego.Instance) (canAttach bool, err error) {
// Total number of attached drives.
var attached int

disks, err := s.CloudProvider.ListInstanceDisks(ctx, instance.ID, nil)
if err != nil {
return false, fmt.Errorf("list instance disks: %w", err)
}
attached += len(disks)

volumes, err := s.CloudProvider.ListInstanceVolumes(ctx, instance.ID, nil)
if err != nil {
return false, fmt.Errorf("list instance volumes: %w", err)
}
attached += len(volumes)

limit := s.maxVolumeAttachments(instance)
return attached < limit, nil
}

// maxVolumeAttachments returns the maximum number of volumes that can be
// attached to a single Linode instance. If instance == nil, or instance.Specs
// == nil, [maxPersistentAttachments] will be returned.
func (s *LinodeControllerServer) maxVolumeAttachments(instance *linodego.Instance) int {
if instance == nil || instance.Specs == nil {
return maxPersistentAttachments
}

// The reported amount of memory for an instance is in MB.
// Convert it to bytes.
memBytes := uint(instance.Specs.Memory) << 20

return maxVolumeAttachments(memBytes)
}

// ControllerUnpublishVolume deattaches the given volume from the node
func (linodeCS *LinodeControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
volumeID, statusErr := common.VolumeIdAsInt("ControllerUnpublishVolume", req)
Expand Down Expand Up @@ -509,7 +565,6 @@ func (linodeCS *LinodeControllerServer) ControllerExpandVolume(ctx context.Conte
}

vol, err = linodeCS.CloudProvider.WaitForVolumeStatus(ctx, vol.ID, linodego.VolumeActive, waitTimeout)

if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
Expand All @@ -522,12 +577,12 @@ func (linodeCS *LinodeControllerServer) ControllerExpandVolume(ctx context.Conte
}
klog.V(4).Info("volume is resized")
return resp, nil

}

// attemptGetContentSourceVolume attempts to get information about the Linode volume to clone from.
func (linodeCS *LinodeControllerServer) attemptGetContentSourceVolume(
ctx context.Context, contentSource *csi.VolumeContentSource) (*common.LinodeVolumeKey, error) {
ctx context.Context, contentSource *csi.VolumeContentSource,
) (*common.LinodeVolumeKey, error) {
// No content source was defined; no clone operation
if contentSource == nil {
return nil, nil
Expand Down Expand Up @@ -598,8 +653,8 @@ func (linodeCS *LinodeControllerServer) attemptCreateLinodeVolume(

// createLinodeVolume creates a Linode volume and returns the result
func (linodeCS *LinodeControllerServer) createLinodeVolume(
ctx context.Context, label string, sizeGB int, tags string) (*linodego.Volume, error) {

ctx context.Context, label string, sizeGB int, tags string,
) (*linodego.Volume, error) {
volumeReq := linodego.VolumeCreateOptions{
Region: linodeCS.MetadataService.GetZone(),
Label: label,
Expand All @@ -625,7 +680,8 @@ func (linodeCS *LinodeControllerServer) createLinodeVolume(

// cloneLinodeVolume clones a Linode volume and returns the result
func (linodeCS *LinodeControllerServer) cloneLinodeVolume(
ctx context.Context, label string, sizeGB, sourceID int) (*linodego.Volume, error) {
ctx context.Context, label string, sizeGB, sourceID int,
) (*linodego.Volume, error) {
klog.V(4).Infoln("cloning volume", map[string]interface{}{
"source_vol_id": sourceID,
})
Expand Down
188 changes: 186 additions & 2 deletions pkg/linode-bs/controllerserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/linode/linode-blockstorage-csi-driver/pkg/common"
linodeclient "github.com/linode/linode-blockstorage-csi-driver/pkg/linode-client"
"github.com/linode/linodego"
)

Expand Down Expand Up @@ -179,44 +180,60 @@ func TestListVolumes(t *testing.T) {
}
}
})

}

}

var _ linodeclient.LinodeClient = &fakeLinodeClient{}

type fakeLinodeClient struct {
volumes []linodego.Volume
disks []linodego.InstanceDisk
throwErr bool
}

func (flc *fakeLinodeClient) ListInstances(context.Context, *linodego.ListOptions) ([]linodego.Instance, error) {
return nil, nil
}

func (flc *fakeLinodeClient) ListVolumes(context.Context, *linodego.ListOptions) ([]linodego.Volume, error) {
if flc.throwErr {
return nil, errors.New("sad times mate")
}
return flc.volumes, nil
}

func (c *fakeLinodeClient) ListInstanceVolumes(_ context.Context, _ int, _ *linodego.ListOptions) ([]linodego.Volume, error) {
return c.volumes, nil
}

func (c *fakeLinodeClient) ListInstanceDisks(_ context.Context, _ int, _ *linodego.ListOptions) ([]linodego.InstanceDisk, error) {
return c.disks, nil
}

func (flc *fakeLinodeClient) GetInstance(context.Context, int) (*linodego.Instance, error) {
return nil, nil
}

func (flc *fakeLinodeClient) GetVolume(context.Context, int) (*linodego.Volume, error) {
return nil, nil
}

func (flc *fakeLinodeClient) CreateVolume(context.Context, linodego.VolumeCreateOptions) (*linodego.Volume, error) {
return nil, nil
}

func (flc *fakeLinodeClient) CloneVolume(context.Context, int, string) (*linodego.Volume, error) {
return nil, nil
}

func (flc *fakeLinodeClient) AttachVolume(context.Context, int, *linodego.VolumeAttachOptions) (*linodego.Volume, error) {
return nil, nil
}
func (flc *fakeLinodeClient) DetachVolume(context.Context, int) error { return nil }
func (flc *fakeLinodeClient) WaitForVolumeLinodeID(context.Context, int, *int, int) (*linodego.Volume, error) {
return nil, nil
}

func (flc *fakeLinodeClient) WaitForVolumeStatus(context.Context, int, linodego.VolumeStatus, int) (*linodego.Volume, error) {
return nil, nil
}
Expand All @@ -229,3 +246,170 @@ func (flc *fakeLinodeClient) NewEventPoller(context.Context, any, linodego.Entit
func createLinodeID(i int) *int {
return &i
}

func TestControllerCanAttach(t *testing.T) {
t.Parallel()

tests := []struct {
memory uint // memory in bytes
nvols int // number of volumes already attached
ndisks int // number of attached disks
want bool // can we attach another?
fail bool // should we expect a non-nil error
}{
{
memory: 1 << 30, // 1GiB
nvols: 7, // maxed out
ndisks: 1,
},
{
memory: 16 << 30, // 16GiB
nvols: 14, // should allow one more
ndisks: 1,
want: true,
},
{
memory: 16 << 30,
nvols: 15,
ndisks: 1,
},
{
memory: 256 << 30, // 256GiB
nvols: 64, // maxed out
},
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for _, tt := range tests {
tname := fmt.Sprintf("%dGB-%d", tt.memory>>30, tt.nvols)
t.Run(tname, func(t *testing.T) {
vols := make([]linodego.Volume, 0, tt.nvols)
for i := 0; i < tt.nvols; i++ {
vols = append(vols, linodego.Volume{ID: i})
}

disks := make([]linodego.InstanceDisk, 0, tt.ndisks)
for i := 0; i < tt.ndisks; i++ {
disks = append(disks, linodego.InstanceDisk{ID: i})
}

memMB := 8192
if tt.memory != 0 {
memMB = int(tt.memory >> 20) // convert bytes -> MB
}
instance := &linodego.Instance{
Specs: &linodego.InstanceSpec{Memory: memMB},
}

srv := LinodeControllerServer{
CloudProvider: &fakeLinodeClient{
volumes: vols,
disks: disks,
},
}

got, err := srv.canAttach(ctx, instance)
if err != nil && !tt.fail {
t.Fatal(err)
} else if err == nil && tt.fail {
t.Fatal("should have failed")
}

if got != tt.want {
t.Errorf("got=%t want=%t", got, tt.want)
}
})
}
}

// testMetadata is an implementation of pkg/metadata.MetadataService that can
// be used by the [LinodeControllerServer] to retrieve metadata about the
// current Linode instance.
//
//nolint:unused
type testMetadata struct {
region string
nodeID int
label string
project string
memory uint // Amount of memory, in bytes
}

//nolint:unused
func (m *testMetadata) GetZone() string { return m.region }

//nolint:unused
func (m *testMetadata) GetProject() string { return m.project }

//nolint:unused
func (m *testMetadata) GetName() string { return m.label }

//nolint:unused
func (m *testMetadata) GetNodeID() int { return m.nodeID }

//nolint:unused
func (m *testMetadata) Memory() uint { return m.memory }

func TestControllerMaxVolumeAttachments(t *testing.T) {
tests := []struct {
name string
instance *linodego.Instance
want int
}{
{
name: "NilInstance",
want: maxPersistentAttachments,
},
{
name: "NilInstanceSpecs",
instance: &linodego.Instance{},
want: maxPersistentAttachments,
},
{
name: "1GB",
instance: &linodego.Instance{
Specs: &linodego.InstanceSpec{Memory: 1 << 10},
},
want: maxPersistentAttachments,
},
{
name: "16GB",
instance: &linodego.Instance{
Specs: &linodego.InstanceSpec{Memory: 16 << 10},
},
want: 16,
},
{
name: "32GB",
instance: &linodego.Instance{
Specs: &linodego.InstanceSpec{Memory: 32 << 10},
},
want: 32,
},
{
name: "64GB",
instance: &linodego.Instance{
Specs: &linodego.InstanceSpec{Memory: 64 << 10},
},
want: maxAttachments,
},
{
name: "96GB",
instance: &linodego.Instance{
Specs: &linodego.InstanceSpec{Memory: 96 << 10},
},
want: maxAttachments,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &LinodeControllerServer{}
got := s.maxVolumeAttachments(tt.instance)
if got != tt.want {
t.Errorf("got=%d want=%d", got, tt.want)
}
})
}
}
Loading

0 comments on commit 06681b2

Please sign in to comment.