Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to serialize formatAndMount #1294

Merged
merged 1 commit into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/gce-pd-csi-driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ var (
maxConcurrentFormat = flag.Int("max-concurrent-format", 1, "The maximum number of concurrent format exec calls")
concurrentFormatTimeout = flag.Duration("concurrent-format-timeout", 1*time.Minute, "The maximum duration of a format operation before its concurrency token is released")

maxConcurrentFormatAndMount = flag.Int("max-concurrent-format-and-mount", 1, "If set then format and mount operations are serialized on each node. This is stronger than max-concurrent-format as it includes fsck and other mount operations")
formatAndMountTimeout = flag.Duration("format-and-mount-timeout", 1*time.Minute, "The maximum duration of a format and mount operation before another such operation will be started. Used only if --serialize-format-and-mount")

version string
)

Expand Down Expand Up @@ -153,6 +156,9 @@ func handle() {
klog.Fatalf("Failed to set up metadata service: %v", err.Error())
}
nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter)
if *maxConcurrentFormatAndMount > 0 {
nodeServer = nodeServer.WithSerializedFormatAndMount(*formatAndMountTimeout, *maxConcurrentFormatAndMount)
}
}

err = gceDriver.SetupGCEDriver(driverName, version, extraVolumeLabels, identityServer, controllerServer, nodeServer)
Expand Down
22 changes: 20 additions & 2 deletions pkg/gce-pd-csi-driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"os"
"path/filepath"
"runtime"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -46,6 +47,15 @@ type GCENodeServer struct {
// A map storing all volumes with ongoing operations so that additional operations
// for that same volume (as defined by VolumeID) return an Aborted error
volumeLocks *common.VolumeLocks

// If set, this semaphore will be used to serialize formatAndMount. It will be raised
// when the operation starts, and lowered either when finished, or when
// formatAndMountTimeout has expired.
//
// This is used only on linux (where memory problems for concurrent fsck and mkfs have
// been observed).
formatAndMountSemaphore chan any
formatAndMountTimeout time.Duration
}

var _ csi.NodeServer = &GCENodeServer{}
Expand Down Expand Up @@ -86,6 +96,14 @@ func (ns *GCENodeServer) isVolumePathMounted(path string) bool {
return false
}

func (ns *GCENodeServer) WithSerializedFormatAndMount(timeout time.Duration, maxConcurrent int) *GCENodeServer {
if maxConcurrent > 0 {
ns.formatAndMountSemaphore = make(chan any, maxConcurrent)
ns.formatAndMountTimeout = timeout
}
return ns
}

func (ns *GCENodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
// Validate Arguments
targetPath := req.GetTargetPath()
Expand Down Expand Up @@ -318,7 +336,7 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
klog.V(4).Infof("CSI volume is read-only, mounting with extra option ro")
}

err = formatAndMount(devicePath, stagingTargetPath, fstype, options, ns.Mounter)
err = ns.formatAndMount(devicePath, stagingTargetPath, fstype, options, ns.Mounter)
if err != nil {
// If a volume is created from a content source like snapshot or cloning, the filesystem might get marked
// as "dirty" even if it is otherwise consistent and ext3/4 will try to restore to a consistent state by replaying
Expand All @@ -329,7 +347,7 @@ func (ns *GCENodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStage
klog.V(4).Infof("Failed to mount CSI volume read-only, retry mounting with extra option noload")

options = append(options, "noload")
err = formatAndMount(devicePath, stagingTargetPath, fstype, options, ns.Mounter)
err = ns.formatAndMount(devicePath, stagingTargetPath, fstype, options, ns.Mounter)
if err == nil {
klog.V(4).Infof("NodeStageVolume succeeded with \"noload\" option on %v to %s", volumeID, stagingTargetPath)
return &csi.NodeStageVolumeResponse{}, nil
Expand Down
31 changes: 27 additions & 4 deletions pkg/gce-pd-csi-driver/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"path/filepath"
"strings"
"testing"
"time"

"k8s.io/utils/exec"
testingexec "k8s.io/utils/exec/testing"
Expand Down Expand Up @@ -58,7 +59,7 @@ func getCustomTestGCEDriver(t *testing.T, mounter *mount.SafeFormatAndMount, dev
return gceDriver
}

func getTestBlockingGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *GCEDriver {
func getTestBlockingMountGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *GCEDriver {
gceDriver := GetGCEDriver()
mounter := mountmanager.NewFakeSafeBlockingMounter(readyToExecute)
nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter))
Expand All @@ -69,6 +70,18 @@ func getTestBlockingGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *
return gceDriver
}

func getTestBlockingFormatAndMountGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *GCEDriver {
gceDriver := GetGCEDriver()
mounter := mountmanager.NewFakeSafeBlockingMounter(readyToExecute)
nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter)).WithSerializedFormatAndMount(5*time.Second, 1)

err := gceDriver.SetupGCEDriver(driver, "test-vendor", nil, nil, nil, nodeServer)
if err != nil {
t.Fatalf("Failed to setup GCE Driver: %v", err)
}
return gceDriver
}

func makeFakeCmd(fakeCmd *testingexec.FakeCmd, cmd string, args ...string) testingexec.FakeCommandAction {
c := cmd
a := args
Expand Down Expand Up @@ -849,9 +862,7 @@ func TestNodeGetCapabilities(t *testing.T) {
}
}

func TestConcurrentNodeOperations(t *testing.T) {
readyToExecute := make(chan chan struct{}, 1)
gceDriver := getTestBlockingGCEDriver(t, readyToExecute)
func runBlockingFormatAndMount(t *testing.T, gceDriver *GCEDriver, readyToExecute chan chan struct{}) {
ns := gceDriver.ns
tempDir, err := ioutil.TempDir("", "cno")
if err != nil {
Expand Down Expand Up @@ -931,3 +942,15 @@ func TestConcurrentNodeOperations(t *testing.T) {
t.Errorf("Unexpected error: %v", err)
}
}

func TestBlockingMount(t *testing.T) {
readyToExecute := make(chan chan struct{}, 1)
gceDriver := getTestBlockingMountGCEDriver(t, readyToExecute)
runBlockingFormatAndMount(t, gceDriver, readyToExecute)
}

func TestBlockingFormatAndMount(t *testing.T) {
readyToExecute := make(chan chan struct{}, 1)
gceDriver := getTestBlockingFormatAndMountGCEDriver(t, readyToExecute)
runBlockingFormatAndMount(t, gceDriver, readyToExecute)
}
29 changes: 28 additions & 1 deletion pkg/gce-pd-csi-driver/utils_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"os"
"strconv"
"strings"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -47,7 +48,33 @@ func getDevicePath(ns *GCENodeServer, volumeID, partition string) (string, error
return devicePath, nil
}

func formatAndMount(source, target, fstype string, options []string, m *mount.SafeFormatAndMount) error {
func (ns *GCENodeServer) formatAndMount(source, target, fstype string, options []string, m *mount.SafeFormatAndMount) error {
if ns.formatAndMountSemaphore != nil {
done := make(chan any)
defer close(done)

// Aquire the semaphore. This will block if another formatAndMount has put an item
// into the semaphore channel.
ns.formatAndMountSemaphore <- struct{}{}
saikat-royc marked this conversation as resolved.
Show resolved Hide resolved

go func() {
defer func() { <-ns.formatAndMountSemaphore }()

// Add a timeout where so the semaphore will be released even if
// formatAndMount is still working. This allows the node to make progress on
// volumes if some error causes one formatAndMount to get stuck. The
// motivation for this serialization is to reduce memory usage; if stuck
// processes cause OOMs then the containers will be killed and restarted,
// including the stuck threads and with any luck making progress.
timeout := time.NewTimer(ns.formatAndMountTimeout)
defer timeout.Stop()

select {
case <-done:
case <-timeout.C:
saikat-royc marked this conversation as resolved.
Show resolved Hide resolved
}
}()
}
return m.FormatAndMount(source, target, fstype, options)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/gce-pd-csi-driver/utils_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
mounter "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager"
)

func formatAndMount(source, target, fstype string, options []string, m *mount.SafeFormatAndMount) error {
func (ns *GCENodeServer) formatAndMount(source, target, fstype string, options []string, m *mount.SafeFormatAndMount) error {
if !strings.EqualFold(fstype, defaultWindowsFsType) {
return fmt.Errorf("GCE PD CSI driver can only supports %s file system, it does not support %s", defaultWindowsFsType, fstype)
}
Expand Down
12 changes: 7 additions & 5 deletions test/e2e/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,18 @@ func GCEClientAndDriverSetup(instance *remote.InstanceInfo, computeEndpoint stri
binPath := path.Join(pkgPath, "bin/gce-pd-csi-driver")

endpoint := fmt.Sprintf("tcp://localhost:%s", port)
computeFlag := ""
extra_flags := []string{
fmt.Sprintf("--extra-labels=%s=%s", DiskLabelKey, DiskLabelValue),
"--max-concurrent-format-and-mount=10", // otherwise the serialization times out.
}
if computeEndpoint != "" {
computeFlag = fmt.Sprintf("--compute-endpoint %s", computeEndpoint)
extra_flags = append(extra_flags, fmt.Sprintf("--compute-endpoint %s", computeEndpoint))
}

workspace := remote.NewWorkspaceDir("gce-pd-e2e-")
// Log at V(6) as the compute API calls are emitted at that level and it's
// useful to see what's happening when debugging tests.
driverRunCmd := fmt.Sprintf("sh -c '/usr/bin/nohup %s/gce-pd-csi-driver -v=6 --endpoint=%s %s --extra-labels=%s=%s 2> %s/prog.out < /dev/null > /dev/null &'",
workspace, endpoint, computeFlag, DiskLabelKey, DiskLabelValue, workspace)
driverRunCmd := fmt.Sprintf("sh -c '/usr/bin/nohup %s/gce-pd-csi-driver -v=6 --endpoint=%s %s 2> %s/prog.out < /dev/null > /dev/null &'",
workspace, endpoint, strings.Join(extra_flags, " "), workspace)

config := &remote.ClientConfig{
PkgPath: pkgPath,
Expand Down