Skip to content

Commit

Permalink
Add option to serialize formatAndMount
Browse files Browse the repository at this point in the history
Change-Id: Ieff9b45a413c6df67d7199de6845132ff3b1cd2f
  • Loading branch information
mattcary committed Jul 20, 2023
1 parent d14b457 commit 1d17763
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 13 deletions.
6 changes: 6 additions & 0 deletions cmd/gce-pd-csi-driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ var (
maxConcurrentFormat = flag.Int("max-concurrent-format", 1, "The maximum number of concurrent format exec calls")
concurrentFormatTimeout = flag.Duration("concurrent-format-timeout", 1*time.Minute, "The maximum duration of a format operation before its concurrency token is released")

maxConcurrentFormatAndMount = flag.Int("max-concurrent-format-and-mount", 1, "If set then format and mount operations are serialized on each node. This is stronger than max-concurrent-format as it includes fsck and other mount operations")
formatAndMountTimeout = flag.Duration("format-and-mount-timeout", 1*time.Minute, "The maximum duration of a format and mount operation before another such operation will be started. Used only if --serialize-format-and-mount")

version string
)

Expand Down Expand Up @@ -153,6 +156,9 @@ func handle() {
klog.Fatalf("Failed to set up metadata service: %v", err.Error())
}
nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter)
if *maxConcurrentFormatAndMount > 0 {
nodeServer = nodeServer.WithSerializedFormatAndMount(*formatAndMountTimeout, *maxConcurrentFormatAndMount)
}
}

err = gceDriver.SetupGCEDriver(driverName, version, extraVolumeLabels, identityServer, controllerServer, nodeServer)
Expand Down
22 changes: 20 additions & 2 deletions pkg/gce-pd-csi-driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"os"
"path/filepath"
"runtime"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -46,6 +47,15 @@ type GCENodeServer struct {
// A map storing all volumes with ongoing operations so that additional operations
// for that same volume (as defined by VolumeID) return an Aborted error
volumeLocks *common.VolumeLocks

// If set, this semaphore will be used to serialize formatAndMount. It will be raised
// when the operation starts, and lowered either when finished, or when
// formatAndMountTimeout has expired.
//
// This is used only on linux (where memory problems for concurrent fsck and mkfs have
// been observed).
formatAndMountSemaphore chan any
formatAndMountTimeout time.Duration
}

var _ csi.NodeServer = &GCENodeServer{}
Expand Down Expand Up @@ -86,6 +96,14 @@ func (ns *GCENodeServer) isVolumePathMounted(path string) bool {
return false
}

func (ns *GCENodeServer) WithSerializedFormatAndMount(timeout time.Duration, maxConcurrent int) *GCENodeServer {
if maxConcurrent > 0 {
ns.formatAndMountSemaphore = make(chan any, maxConcurrent)
ns.formatAndMountTimeout = timeout
}
return ns
}

func (ns *GCENodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
// Validate Arguments
targetPath := req.GetTargetPath()
Expand Down Expand Up @@ -318,7 +336,7 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
klog.V(4).Infof("CSI volume is read-only, mounting with extra option ro")
}

err = formatAndMount(devicePath, stagingTargetPath, fstype, options, ns.Mounter)
err = ns.formatAndMount(devicePath, stagingTargetPath, fstype, options, ns.Mounter)
if err != nil {
// If a volume is created from a content source like snapshot or cloning, the filesystem might get marked
// as "dirty" even if it is otherwise consistent and ext3/4 will try to restore to a consistent state by replaying
Expand All @@ -329,7 +347,7 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
klog.V(4).Infof("Failed to mount CSI volume read-only, retry mounting with extra option noload")

options = append(options, "noload")
err = formatAndMount(devicePath, stagingTargetPath, fstype, options, ns.Mounter)
err = ns.formatAndMount(devicePath, stagingTargetPath, fstype, options, ns.Mounter)
if err == nil {
klog.V(4).Infof("NodeStageVolume succeeded with \"noload\" option on %v to %s", volumeID, stagingTargetPath)
return &csi.NodeStageVolumeResponse{}, nil
Expand Down
31 changes: 27 additions & 4 deletions pkg/gce-pd-csi-driver/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"path/filepath"
"strings"
"testing"
"time"

"k8s.io/utils/exec"
testingexec "k8s.io/utils/exec/testing"
Expand Down Expand Up @@ -58,7 +59,7 @@ func getCustomTestGCEDriver(t *testing.T, mounter *mount.SafeFormatAndMount, dev
return gceDriver
}

func getTestBlockingGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *GCEDriver {
func getTestBlockingMountGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *GCEDriver {
gceDriver := GetGCEDriver()
mounter := mountmanager.NewFakeSafeBlockingMounter(readyToExecute)
nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter))
Expand All @@ -69,6 +70,18 @@ func getTestBlockingGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *
return gceDriver
}

func getTestBlockingFormatAndMountGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *GCEDriver {
gceDriver := GetGCEDriver()
mounter := mountmanager.NewFakeSafeBlockingMounter(readyToExecute)
nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter)).WithSerializedFormatAndMount(5 * time.Second, 1)

err := gceDriver.SetupGCEDriver(driver, "test-vendor", nil, nil, nil, nodeServer)
if err != nil {
t.Fatalf("Failed to setup GCE Driver: %v", err)
}
return gceDriver
}

func makeFakeCmd(fakeCmd *testingexec.FakeCmd, cmd string, args ...string) testingexec.FakeCommandAction {
c := cmd
a := args
Expand Down Expand Up @@ -849,9 +862,7 @@ func TestNodeGetCapabilities(t *testing.T) {
}
}

func TestConcurrentNodeOperations(t *testing.T) {
readyToExecute := make(chan chan struct{}, 1)
gceDriver := getTestBlockingGCEDriver(t, readyToExecute)
func runBlockingFormatAndMount(t *testing.T, gceDriver *GCEDriver, readyToExecute chan chan struct{}) {
ns := gceDriver.ns
tempDir, err := ioutil.TempDir("", "cno")
if err != nil {
Expand Down Expand Up @@ -931,3 +942,15 @@ func TestConcurrentNodeOperations(t *testing.T) {
t.Errorf("Unexpected error: %v", err)
}
}

func TestBlockingMount(t *testing.T) {
readyToExecute := make(chan chan struct{}, 1)
gceDriver := getTestBlockingMountGCEDriver(t, readyToExecute)
runBlockingFormatAndMount(t, gceDriver, readyToExecute)
}

func TestBlockingFormatAndMount(t *testing.T) {
readyToExecute := make(chan chan struct{}, 1)
gceDriver := getTestBlockingFormatAndMountGCEDriver(t, readyToExecute)
runBlockingFormatAndMount(t, gceDriver, readyToExecute)
}
29 changes: 28 additions & 1 deletion pkg/gce-pd-csi-driver/utils_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"os"
"strconv"
"strings"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -47,7 +48,33 @@ func getDevicePath(ns *GCENodeServer, volumeID, partition string) (string, error
return devicePath, nil
}

func formatAndMount(source, target, fstype string, options []string, m *mount.SafeFormatAndMount) error {
func (ns *GCENodeServer) formatAndMount(source, target, fstype string, options []string, m *mount.SafeFormatAndMount) error {
if ns.formatAndMountSemaphore != nil {
done := make(chan any)
defer close(done)

// Aquire the semaphore. This will block if another formatAndMount has put an item
// into the semaphore channel.
ns.formatAndMountSemaphore <- struct{}{}

go func() {
defer func() { <-ns.formatAndMountSemaphore }()

// Add a timeout where so the semaphore will be released even if
// formatAndMount is still working. This allows the node to make progress on
// volumes if some error causes one formatAndMount to get stuck. The
// motivation for this serialization is to reduce memory usage; if stuck
// processes cause OOMs then the containers will be killed and restarted,
// including the stuck threads and with any luck making progress.
timeout := time.NewTimer(ns.formatAndMountTimeout)
defer timeout.Stop()

select {
case <-done:
case <-timeout.C:
}
}()
}
return m.FormatAndMount(source, target, fstype, options)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/gce-pd-csi-driver/utils_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
mounter "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager"
)

func formatAndMount(source, target, fstype string, options []string, m *mount.SafeFormatAndMount) error {
func (ns *GCENodeServer) formatAndMount(source, target, fstype string, options []string, m *mount.SafeFormatAndMount) error {
if !strings.EqualFold(fstype, defaultWindowsFsType) {
return fmt.Errorf("GCE PD CSI driver can only supports %s file system, it does not support %s", defaultWindowsFsType, fstype)
}
Expand Down
12 changes: 7 additions & 5 deletions test/e2e/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,18 @@ func GCEClientAndDriverSetup(instance *remote.InstanceInfo, computeEndpoint stri
binPath := path.Join(pkgPath, "bin/gce-pd-csi-driver")

endpoint := fmt.Sprintf("tcp://localhost:%s", port)
computeFlag := ""
extra_flags := []string{
fmt.Sprintf("--extra-labels=%s=%s", DiskLabelKey, DiskLabelValue),
"--max-concurrent-format-and-mount=10", // otherwise the serialization times out.
}
if computeEndpoint != "" {
computeFlag = fmt.Sprintf("--compute-endpoint %s", computeEndpoint)
extra_flags = append(extra_flags, fmt.Sprintf("--compute-endpoint %s", computeEndpoint))
}

workspace := remote.NewWorkspaceDir("gce-pd-e2e-")
// Log at V(6) as the compute API calls are emitted at that level and it's
// useful to see what's happening when debugging tests.
driverRunCmd := fmt.Sprintf("sh -c '/usr/bin/nohup %s/gce-pd-csi-driver -v=6 --endpoint=%s %s --extra-labels=%s=%s 2> %s/prog.out < /dev/null > /dev/null &'",
workspace, endpoint, computeFlag, DiskLabelKey, DiskLabelValue, workspace)
driverRunCmd := fmt.Sprintf("sh -c '/usr/bin/nohup %s/gce-pd-csi-driver -v=6 --endpoint=%s %s 2> %s/prog.out < /dev/null > /dev/null &'",
workspace, endpoint, strings.Join(extra_flags, " "), workspace)

config := &remote.ClientConfig{
PkgPath: pkgPath,
Expand Down

0 comments on commit 1d17763

Please sign in to comment.