Skip to content

Commit

Permalink
Add option to serialize formatAndMount
Browse files Browse the repository at this point in the history
Change-Id: Ieff9b45a413c6df67d7199de6845132ff3b1cd2f
  • Loading branch information
mattcary committed Jul 20, 2023
1 parent d14b457 commit 75f5cbf
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 44 deletions.
6 changes: 6 additions & 0 deletions cmd/gce-pd-csi-driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ var (
maxConcurrentFormat = flag.Int("max-concurrent-format", 1, "The maximum number of concurrent format exec calls")
concurrentFormatTimeout = flag.Duration("concurrent-format-timeout", 1*time.Minute, "The maximum duration of a format operation before its concurrency token is released")

serializeFormatAndMount = flag.Bool("serialize-format-and-mount", false, "If set then format and mount operations are serialized on each node")
formatAndMountTimeout = flag.Duration("format-and-mount-timeout", 1*time.Minute, "The maximum duration of a format and mount operation before another such operation will be started")

version string
)

Expand Down Expand Up @@ -153,6 +156,9 @@ func handle() {
klog.Fatalf("Failed to set up metadata service: %v", err.Error())
}
nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter)
if *serializeFormatAndMount {
nodeServer = nodeServer.WithSerializedFormatAndMount(*serializeFormatAndMountTimeout)
}
}

err = gceDriver.SetupGCEDriver(driverName, version, extraVolumeLabels, identityServer, controllerServer, nodeServer)
Expand Down
128 changes: 92 additions & 36 deletions pkg/gce-pd-csi-driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sort"
"strings"
"time"
neturl "net/url"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
csi "github.com/container-storage-interface/spec/lib/go/csi"
Expand Down Expand Up @@ -138,6 +139,14 @@ const (

// Keys in the volume context.
contextForceAttach = "force-attach"

resourceApiScheme = "http"
resourceApiService = "compute"
resourceProject = "project"
)

var (
resourceApiHost = regexp.Compile("^www.*apis.com$")
)

func isDiskReady(disk *gce.CloudDisk) (bool, error) {
Expand Down Expand Up @@ -434,7 +443,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre
}

klog.V(4).Infof("CreateVolume succeeded for disk %v", volKey)
return generateCreateVolumeResponse(disk, zones, params), nil
return generateCreateVolumeResponse(disk, zones, params)

}

Expand Down Expand Up @@ -869,13 +878,23 @@ func (gceCS *GCEControllerServer) ListVolumes(ctx context.Context, req *csi.List
entries := []*csi.ListVolumesResponse_Entry{}
for i := 0; i+offset < len(gceCS.disks) && i < maxEntries; i++ {
d := gceCS.disks[i+offset]
diskRsrc, err := getResourceId(d.SelfLink)
if err != nil {
klog.Warning("Bad ListVolumes disk resource %s, skipped: %v", d.SelfLink, err)
continue
}
users := []string{}
for _, u := range d.Users {
users = append(users, cleanSelfLink(u))
rsrc, err := getResourceId(u)
if err != nil {
klog.Warning("Bad ListVolumes user %s, skipped: %v", u, err)
} else {
users = append(users, rsrc)
}
}
entries = append(entries, &csi.ListVolumesResponse_Entry{
Volume: &csi.Volume{
VolumeId: cleanSelfLink(d.SelfLink),
VolumeId: diskRsrc,
},
Status: &csi.ListVolumesResponse_VolumeStatus{
PublishedNodeIds: users,
Expand Down Expand Up @@ -988,6 +1007,10 @@ func (gceCS *GCEControllerServer) createPDSnapshot(ctx context.Context, project
return nil, common.LoggedError("Failed to create snapshot: ", err)
}
}
snapshotId, err := getResourceId(snapshot.SelfLink)
if err != nil {
return nil, common.LoggedError(fmt.Sprintf("Cannot extract resource id from snapshot %s", snapshot.SelfLink), err)
}

err = gceCS.validateExistingSnapshot(snapshot, volKey)
if err != nil {
Expand All @@ -1006,7 +1029,7 @@ func (gceCS *GCEControllerServer) createPDSnapshot(ctx context.Context, project

return &csi.Snapshot{
SizeBytes: common.GbToBytes(snapshot.DiskSizeGb),
SnapshotId: cleanSelfLink(snapshot.SelfLink),
SnapshotId: SnapshotId,
SourceVolumeId: volumeID,
CreationTime: timestamp,
ReadyToUse: ready,
Expand Down Expand Up @@ -1035,6 +1058,10 @@ func (gceCS *GCEControllerServer) createImage(ctx context.Context, project strin
return nil, common.LoggedError("Failed to create image: ", err)
}
}
imageId, err := getResourceId(image.SelfLink)
if err != nil {
return nil, common.LoggedError(fmt.Sprintf("Cannot extract resource id from snapshot %s", image.SelfLink), err)
}

err = gceCS.validateExistingImage(image, volKey)
if err != nil {
Expand All @@ -1053,7 +1080,7 @@ func (gceCS *GCEControllerServer) createImage(ctx context.Context, project strin

return &csi.Snapshot{
SizeBytes: common.GbToBytes(image.DiskSizeGb),
SnapshotId: cleanSelfLink(image.SelfLink),
SnapshotId: imageId,
SourceVolumeId: volumeID,
CreationTime: timestamp,
ReadyToUse: ready,
Expand All @@ -1065,9 +1092,13 @@ func (gceCS *GCEControllerServer) validateExistingImage(image *compute.Image, vo
return fmt.Errorf("disk does not exist")
}

_, sourceKey, err := common.VolumeIDToKey(cleanSelfLink(image.SourceDisk))
sourceId, err := getResourceId(image.SourceDisk)
if err != nil {
return fmt.Errorf("failed to get source id from %s: %w", image.SourceDisk, err)
}
_, sourceKey, err := common.VolumeIDToKey(sourceId)
if err != nil {
return fmt.Errorf("fail to get source disk key %s, %w", image.SourceDisk, err)
return fmt.Errorf("failed to get source disk key %s: %w", image.SourceDisk, err)
}

if sourceKey.String() != volKey.String() {
Expand Down Expand Up @@ -1116,7 +1147,11 @@ func (gceCS *GCEControllerServer) validateExistingSnapshot(snapshot *compute.Sna
return fmt.Errorf("disk does not exist")
}

_, sourceKey, err := common.VolumeIDToKey(cleanSelfLink(snapshot.SourceDisk))
sourceId, err := getResourceId(snapshot.SourceDisk)
if err != nil {
retur fmt.Errorf("failed to get source id from %s: %w", snapshot.SourceDisk, err)
}
_, sourceKey, err := common.VolumeIDToKey(sourceId)
if err != nil {
return fmt.Errorf("fail to get source disk key %s, %w", snapshot.SourceDisk, err)
}
Expand Down Expand Up @@ -1350,7 +1385,7 @@ func (gceCS *GCEControllerServer) getSnapshotByID(ctx context.Context, snapshotI
return &csi.ListSnapshotsResponse{}, nil
}
}
e, err := generateImageEntry(image)
e, err := generateDiskImageEntry(image)
if err != nil {
return nil, fmt.Errorf("failed to generate image entry: %w", err)
}
Expand All @@ -1372,6 +1407,15 @@ func generateDiskSnapshotEntry(snapshot *compute.Snapshot) (*csi.ListSnapshotsRe
return nil, fmt.Errorf("Failed to covert creation timestamp: %w", err)
}

snapshotId, err := getResourceId(snapshot.SelfLink)
if err != nil {
return nil, fmt.Errorf("failed to get snapshot id from %s: %w", snapshot.SelfLink, err)
}
sourceId, err := getResourceId(snapshot.SourceDisk)
if err != nil {
return nil, fmt.Errorf("failed to get source id from %s: %w", snapshot.SourceDisk, err)
}

// We ignore the error intentionally here since we are just listing snapshots
// TODO: If the snapshot is in "FAILED" state we need to think through what this
// should actually look like.
Expand All @@ -1380,8 +1424,8 @@ func generateDiskSnapshotEntry(snapshot *compute.Snapshot) (*csi.ListSnapshotsRe
entry := &csi.ListSnapshotsResponse_Entry{
Snapshot: &csi.Snapshot{
SizeBytes: common.GbToBytes(snapshot.DiskSizeGb),
SnapshotId: cleanSelfLink(snapshot.SelfLink),
SourceVolumeId: cleanSelfLink(snapshot.SourceDisk),
SnapshotId: snapshotId,
SourceVolumeId: sourceId,
CreationTime: tp,
ReadyToUse: ready,
},
Expand All @@ -1397,35 +1441,23 @@ func generateDiskImageEntry(image *compute.Image) (*csi.ListSnapshotsResponse_En
return nil, fmt.Errorf("failed to covert creation timestamp: %w", err)
}

ready, _ := isImageReady(image.Status)

entry := &csi.ListSnapshotsResponse_Entry{
Snapshot: &csi.Snapshot{
SizeBytes: common.GbToBytes(image.DiskSizeGb),
SnapshotId: cleanSelfLink(image.SelfLink),
SourceVolumeId: cleanSelfLink(image.SourceDisk),
CreationTime: tp,
ReadyToUse: ready,
},
imageId, err := getResourceId(image.SelfLink)
if err != nil {
return nil, fmt.Errorf("cannot get image id from %s: %w", image.SelfLink, err)
}
return entry, nil
}

func generateImageEntry(image *compute.Image) (*csi.ListSnapshotsResponse_Entry, error) {
timestamp, err := parseTimestamp(image.CreationTimestamp)
sourceId, err := getResourceId(image.SourceDisk)
if err != nil {
return nil, fmt.Errorf("Failed to covert creation timestamp: %w", err)
return nil, fmt.Errorf("cannot get sourcee id from %s: %w", image.SourceLink, err)
}

// ignore the error intentionally here since we are just listing images
ready, _ := isImageReady(image.Status)

entry := &csi.ListSnapshotsResponse_Entry{
Snapshot: &csi.Snapshot{
SizeBytes: common.GbToBytes(image.DiskSizeGb),
SnapshotId: cleanSelfLink(image.SelfLink),
SourceVolumeId: cleanSelfLink(image.SourceDisk),
CreationTime: timestamp,
SnapshotId: imageId,
SourceVolumeId: sourceId,
CreationTime: tp,
ReadyToUse: ready,
},
}
Expand Down Expand Up @@ -1691,7 +1723,12 @@ func extractVolumeContext(context map[string]string) (*PDCSIContext, error) {
return info, nil
}

func generateCreateVolumeResponse(disk *gce.CloudDisk, zones []string, params common.DiskParameters) *csi.CreateVolumeResponse {
func generateCreateVolumeResponse(disk *gce.CloudDisk, zones []string, params common.DiskParameters) *csi.CreateVolumeResponse, error {
volumeId, err := getResourceId(disk.GetSelfLink())
if err != nil {
return nil, fmt.Errorf("cannot get volume id from %s: %w", disk.GetSelfLink(), err)
}

tops := []*csi.Topology{}
for _, zone := range zones {
tops = append(tops, &csi.Topology{
Expand All @@ -1702,7 +1739,7 @@ func generateCreateVolumeResponse(disk *gce.CloudDisk, zones []string, params co
createResp := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
CapacityBytes: realDiskSizeBytes,
VolumeId: cleanSelfLink(disk.GetSelfLink()),
VolumeId: volumeId,
VolumeContext: paramsToVolumeContext(params),
AccessibleTopology: tops,
},
Expand Down Expand Up @@ -1744,9 +1781,28 @@ func generateCreateVolumeResponse(disk *gce.CloudDisk, zones []string, params co
return createResp
}

func cleanSelfLink(selfLink string) string {
r, _ := regexp.Compile("https:\\/\\/www.*apis.com\\/.*(v1|beta|alpha)\\/")
return r.ReplaceAllString(selfLink, "")
func getResourceId(resourceLink string) (string, error) {
url, err := neturl.Parse(resourceLink)
if err != nil {
return fmt.Errorf("Could not parse resource %s: %w", resourceLink, err)
}
if url.Scheme != resourceApiScheme {
return fmt.Errorf("Unexpected API scheme for resource %s", resourceLink)
}
if !resourceApiHost.MatchString(url.Host) {
return fmt.Errorf("Unexpected API host for resource %s", resourceLink)
}
elts := strings.Split(url.Path, "/")
if len(elts) < 3 {
return fmt.Errorf("Bad resource path %s", resourceLink)
}
if elts[0] != resourceApiService {
return fmt.Errorf("Bad resource service %s in %s", elts[0], resourceLink)
}
if elts[2] != resourceProject {
return fmt.Errorf("Expected %s to start with %s in resource %s", elts[2:], resourceProject, resourceLink)
}
return elts[2:]
}

func createRegionalDisk(ctx context.Context, cloudProvider gce.GCECompute, name string, zones []string, params common.DiskParameters, capacityRange *csi.CapacityRange, capBytes int64, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool) (*gce.CloudDisk, error) {
Expand Down
20 changes: 18 additions & 2 deletions pkg/gce-pd-csi-driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"os"
"path/filepath"
"runtime"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -46,6 +47,15 @@ type GCENodeServer struct {
// A map storing all volumes with ongoing operations so that additional operations
// for that same volume (as defined by VolumeID) return an Aborted error
volumeLocks *common.VolumeLocks

// If set, this semaphore will be used to serialize formatAndMount. It will be raised
// when the operation starts, and lowered either when finished, or when
// formatAndMountTimeout has expired.
//
// This is used only on linux (where memory problems for concurrent fsck and mkfs have
// been observed).
formatAndMountSemaphore chan any
formatAndMountTimeout time.Duration
}

var _ csi.NodeServer = &GCENodeServer{}
Expand Down Expand Up @@ -86,6 +96,12 @@ func (ns *GCENodeServer) isVolumePathMounted(path string) bool {
return false
}

func (ns *GCENodeServer) WithSerializedFormatAndMount(timeout time.Duration) *GCENodeServer {
ns.formatAndMountSemaphore = make(chan any, 1)
ns.formatAndMountTimeout = timeout
return ns
}

func (ns *GCENodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
// Validate Arguments
targetPath := req.GetTargetPath()
Expand Down Expand Up @@ -318,7 +334,7 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
klog.V(4).Infof("CSI volume is read-only, mounting with extra option ro")
}

err = formatAndMount(devicePath, stagingTargetPath, fstype, options, ns.Mounter)
err = ns.formatAndMount(devicePath, stagingTargetPath, fstype, options, ns.Mounter)
if err != nil {
// If a volume is created from a content source like snapshot or cloning, the filesystem might get marked
// as "dirty" even if it is otherwise consistent and ext3/4 will try to restore to a consistent state by replaying
Expand All @@ -329,7 +345,7 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
klog.V(4).Infof("Failed to mount CSI volume read-only, retry mounting with extra option noload")

options = append(options, "noload")
err = formatAndMount(devicePath, stagingTargetPath, fstype, options, ns.Mounter)
err = ns.formatAndMount(devicePath, stagingTargetPath, fstype, options, ns.Mounter)
if err == nil {
klog.V(4).Infof("NodeStageVolume succeeded with \"noload\" option on %v to %s", volumeID, stagingTargetPath)
return &csi.NodeStageVolumeResponse{}, nil
Expand Down
31 changes: 27 additions & 4 deletions pkg/gce-pd-csi-driver/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"path/filepath"
"strings"
"testing"
"time"

"k8s.io/utils/exec"
testingexec "k8s.io/utils/exec/testing"
Expand Down Expand Up @@ -58,7 +59,7 @@ func getCustomTestGCEDriver(t *testing.T, mounter *mount.SafeFormatAndMount, dev
return gceDriver
}

func getTestBlockingGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *GCEDriver {
func getTestBlockingMountGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *GCEDriver {
gceDriver := GetGCEDriver()
mounter := mountmanager.NewFakeSafeBlockingMounter(readyToExecute)
nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter))
Expand All @@ -69,6 +70,18 @@ func getTestBlockingGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *
return gceDriver
}

func getTestBlockingFormatAndMountGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *GCEDriver {
gceDriver := GetGCEDriver()
mounter := mountmanager.NewFakeSafeBlockingMounter(readyToExecute)
nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter)).WithSerializedFormatAndMount(5 * time.Second)

err := gceDriver.SetupGCEDriver(driver, "test-vendor", nil, nil, nil, nodeServer)
if err != nil {
t.Fatalf("Failed to setup GCE Driver: %v", err)
}
return gceDriver
}

func makeFakeCmd(fakeCmd *testingexec.FakeCmd, cmd string, args ...string) testingexec.FakeCommandAction {
c := cmd
a := args
Expand Down Expand Up @@ -849,9 +862,7 @@ func TestNodeGetCapabilities(t *testing.T) {
}
}

func TestConcurrentNodeOperations(t *testing.T) {
readyToExecute := make(chan chan struct{}, 1)
gceDriver := getTestBlockingGCEDriver(t, readyToExecute)
func runBlockingFormatAndMount(t *testing.T, gceDriver *GCEDriver, readyToExecute chan chan struct{}) {
ns := gceDriver.ns
tempDir, err := ioutil.TempDir("", "cno")
if err != nil {
Expand Down Expand Up @@ -931,3 +942,15 @@ func TestConcurrentNodeOperations(t *testing.T) {
t.Errorf("Unexpected error: %v", err)
}
}

func TestBlockingMount(t *testing.T) {
readyToExecute := make(chan chan struct{}, 1)
gceDriver := getTestBlockingMountGCEDriver(t, readyToExecute)
runBlockingFormatAndMount(t, gceDriver, readyToExecute)
}

func TestBlockingFormatAndMount(t *testing.T) {
readyToExecute := make(chan chan struct{}, 1)
gceDriver := getTestBlockingFormatAndMountGCEDriver(t, readyToExecute)
runBlockingFormatAndMount(t, gceDriver, readyToExecute)
}
Loading

0 comments on commit 75f5cbf

Please sign in to comment.