From c1641a63b09abff0feb9ec1e2eba37308ec06439 Mon Sep 17 00:00:00 2001 From: Alex Cortelyou Date: Thu, 25 Jul 2024 15:10:30 -0700 Subject: [PATCH 1/5] fix: clone volume content to requested volume --- pkg/blob/controllerserver.go | 193 ++++++++++++++++-------------- pkg/blob/controllerserver_test.go | 166 +++---------------------- 2 files changed, 117 insertions(+), 242 deletions(-) diff --git a/pkg/blob/controllerserver.go b/pkg/blob/controllerserver.go index 636119147..e9ca2b8de 100644 --- a/pkg/blob/controllerserver.go +++ b/pkg/blob/controllerserver.go @@ -79,19 +79,12 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) return nil, status.Error(codes.InvalidArgument, err.Error()) } - if acquired := d.volumeLocks.TryAcquire(volName); !acquired { - // logging the job status if it's volume cloning - if req.GetVolumeContentSource() != nil { - jobState, percent, err := d.azcopy.GetAzcopyJob(volName, []string{}) - klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err) - } - return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName) - } - defer d.volumeLocks.Release(volName) - volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes()) requestGiB := int(util.RoundUpGiB(volSizeBytes)) + volContentSource := req.GetVolumeContentSource() + secrets := req.GetSecrets() + parameters := req.GetParameters() if parameters == nil { parameters = make(map[string]string) @@ -341,10 +334,31 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) GetLatestAccountKey: getLatestAccountKey, } + containerName = replaceWithMap(containerName, containerNameReplaceMap) + validContainerName := containerName + if validContainerName == "" { + validContainerName = volName + if containerNamePrefix != "" { + validContainerName = containerNamePrefix + "-" + volName + } + validContainerName = getValidContainerName(validContainerName, protocol) + setKeyValueInMap(parameters, containerNameField, validContainerName) + } + + if acquired := d.volumeLocks.TryAcquire(volName); !acquired { + // logging the job status if it's volume cloning + if volContentSource != nil { + jobState, percent, err := d.azcopy.GetAzcopyJob(validContainerName, []string{}) + return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsWithAzcopyFmt, volName, jobState, percent, err) + } + return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName) + } + defer d.volumeLocks.Release(volName) + var volumeID string requestName := "controller_create_volume" - if req.GetVolumeContentSource() != nil { - switch req.VolumeContentSource.Type.(type) { + if volContentSource != nil { + switch volContentSource.Type.(type) { case *csi.VolumeContentSource_Snapshot: requestName = "controller_create_volume_from_snapshot" case *csi.VolumeContentSource_Volume: @@ -357,9 +371,39 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID) }() + var srcAzcopyAuthEnv []string + var srcSubscriptionID, srcResourceGroupName, srcAccountName, srcContainerName, srcPath, srcAccountSASToken string + if volContentSource != nil { + switch volContentSource.Type.(type) { + case *csi.VolumeContentSource_Snapshot: + return nil, status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented") + case *csi.VolumeContentSource_Volume: + var srcVolumeID string + if volContentSource.GetVolume() != nil { + srcVolumeID = volContentSource.GetVolume().GetVolumeId() + } + srcResourceGroupName, srcAccountName, srcContainerName, _, srcSubscriptionID, err = GetContainerInfo(srcVolumeID) + if err != nil { + return nil, status.Error(codes.NotFound, err.Error()) + } + srcAccountOptions := &azure.AccountOptions{ + Name: srcAccountName, + SubscriptionID: srcSubscriptionID, + ResourceGroup: srcResourceGroupName, + GetLatestAccountKey: getLatestAccountKey, + } + srcAccountSASToken, srcAzcopyAuthEnv, err = d.getAzcopyAuth(ctx, srcAccountName, "", storageEndpointSuffix, srcAccountOptions, secrets, secretName, secretNamespace) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err) + } + srcPath = fmt.Sprintf("https://%s.blob.%s/%s", srcAccountName, storageEndpointSuffix, srcContainerName) + default: + return nil, status.Errorf(codes.InvalidArgument, "VolumeContentSource is not recognized: %v", volContentSource) + } + } + var accountKey string accountName := account - secrets := req.GetSecrets() if len(secrets) == 0 && accountName == "" { if v, ok := d.volMap.Load(volName); ok { accountName = v.(string) @@ -413,31 +457,26 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) secrets = createStorageAccountSecret(accountName, accountKey) } - // replace pv/pvc name namespace metadata in subDir - containerName = replaceWithMap(containerName, containerNameReplaceMap) - validContainerName := containerName - if validContainerName == "" { - validContainerName = volName - if containerNamePrefix != "" { - validContainerName = containerNamePrefix + "-" + volName + dstAzcopyAuthEnv := srcAzcopyAuthEnv + dstAccountSASToken := srcAccountSASToken + if volContentSource != nil { + if srcSubscriptionID != subsID || srcResourceGroupName != resourceGroup || srcAccountName != accountName { + if dstAccountSASToken, dstAzcopyAuthEnv, err = d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace); err != nil { + return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err) + } } - validContainerName = getValidContainerName(validContainerName, protocol) - setKeyValueInMap(parameters, containerNameField, validContainerName) } - if req.GetVolumeContentSource() != nil { - accountSASToken, authAzcopyEnv, err := d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace) - if err != nil { - return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err) - } - if err := d.copyVolume(req, accountSASToken, authAzcopyEnv, validContainerName, storageEndpointSuffix); err != nil { + klog.V(2).Infof("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)", validContainerName, accountName, storageAccountType, subsID, resourceGroup, location, requestGiB) + if err := d.CreateBlobContainer(ctx, subsID, resourceGroup, accountName, validContainerName, secrets); err != nil { + return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err) + } + + if volContentSource != nil { + dstPath := fmt.Sprintf("https://%s.blob.%s/%s", accountName, storageEndpointSuffix, validContainerName) + if err := d.copyBlobContainer(dstAzcopyAuthEnv, srcPath, srcAccountSASToken, dstPath, dstAccountSASToken, validContainerName); err != nil { return nil, err } - } else { - klog.V(2).Infof("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)", validContainerName, accountName, storageAccountType, subsID, resourceGroup, location, requestGiB) - if err := d.CreateBlobContainer(ctx, subsID, resourceGroup, accountName, validContainerName, secrets); err != nil { - return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err) - } } if storeAccountKey && len(req.GetSecrets()) == 0 { @@ -478,7 +517,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) VolumeId: volumeID, CapacityBytes: req.GetCapacityRange().GetRequiredBytes(), VolumeContext: parameters, - ContentSource: req.GetVolumeContentSource(), + ContentSource: volContentSource, }, }, nil } @@ -724,71 +763,45 @@ func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupN }) } -// CopyBlobContainer copies a blob container in the same storage account -func (d *Driver) copyBlobContainer(req *csi.CreateVolumeRequest, accountSasToken string, authAzcopyEnv []string, dstContainerName, storageEndpointSuffix string) error { - var sourceVolumeID string - if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil { - sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId() +// copyBlobContainer copies source volume content into a destination volume +func (d *Driver) copyBlobContainer(authAzcopyEnv []string, srcPath string, srcAccountSASToken string, dstPath string, dstAccountSASToken string, dstContainerName string) error { - } - resourceGroupName, accountName, srcContainerName, _, _, err := GetContainerInfo(sourceVolumeID) //nolint:dogsled - if err != nil { - return status.Error(codes.NotFound, err.Error()) - } - if srcContainerName == "" || dstContainerName == "" { - return fmt.Errorf("srcContainerName(%s) or dstContainerName(%s) is empty", srcContainerName, dstContainerName) + if srcPath == "" || dstPath == "" || dstContainerName == "" { + return fmt.Errorf("srcPath(%s) or dstPath(%s) or dstContainerName(%s) is empty", srcPath, dstPath, dstContainerName) } - timeAfter := time.After(time.Duration(d.waitForAzCopyTimeoutMinutes) * time.Minute) - timeTick := time.Tick(waitForAzCopyInterval) - srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, srcContainerName, accountSasToken) - dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, dstContainerName, accountSasToken) - jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv) klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err) - if jobState == util.AzcopyJobError || jobState == util.AzcopyJobCompleted { + switch jobState { + case util.AzcopyJobError, util.AzcopyJobCompleted: return err - } - klog.V(2).Infof("begin to copy blob container %s to %s", srcContainerName, dstContainerName) - for { - select { - case <-timeTick: - jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv) - klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err) - switch jobState { - case util.AzcopyJobError, util.AzcopyJobCompleted: - return err - case util.AzcopyJobNotFound: - klog.V(2).Infof("copy blob container %s to %s", srcContainerName, dstContainerName) - cmd := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false") - if len(authAzcopyEnv) > 0 { - cmd.Env = append(os.Environ(), authAzcopyEnv...) - } - out, copyErr := cmd.CombinedOutput() - if copyErr != nil { - klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error(%v): %v", resourceGroupName, accountName, dstPath, copyErr, string(out)) - } else { - klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName) - } - return copyErr + case util.AzcopyJobRunning: + return fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is %s%%", percent) + case util.AzcopyJobNotFound: + klog.V(2).Infof("copy blob container %s to %s", srcPath, dstContainerName) + execFunc := func() error { + cmd := exec.Command("azcopy", "copy", srcPath+srcAccountSASToken, dstPath+dstAccountSASToken, "--recursive", "--check-length=false") + if len(authAzcopyEnv) > 0 { + cmd.Env = append(os.Environ(), authAzcopyEnv...) + } + if out, err := cmd.CombinedOutput(); err != nil { + return fmt.Errorf("exec error: %v, output: %v", err, string(out)) } - case <-timeAfter: - return fmt.Errorf("timeout waiting for copy blob container %s to %s succeed", srcContainerName, dstContainerName) + return nil } + timeoutFunc := func() error { + _, percent, _ := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv) + return fmt.Errorf("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%", srcPath, dstContainerName, percent) + } + copyErr := util.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFunc, timeoutFunc) + if copyErr != nil { + klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", srcPath, dstPath, dstContainerName, copyErr) + } else { + klog.V(2).Infof("copied blob container %s to %s successfully", srcPath, dstContainerName) + } + return copyErr } -} - -// copyVolume copies a volume form volume or snapshot, snapshot is not supported now -func (d *Driver) copyVolume(req *csi.CreateVolumeRequest, accountSASToken string, authAzcopyEnv []string, dstContainerName, storageEndpointSuffix string) error { - vs := req.VolumeContentSource - switch vs.Type.(type) { - case *csi.VolumeContentSource_Snapshot: - return status.Errorf(codes.InvalidArgument, "copy volume from volumeSnapshot is not supported") - case *csi.VolumeContentSource_Volume: - return d.copyBlobContainer(req, accountSASToken, authAzcopyEnv, dstContainerName, storageEndpointSuffix) - default: - return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs) - } + return err } // authorizeAzcopyWithIdentity returns auth env for azcopy using cluster identity diff --git a/pkg/blob/controllerserver_test.go b/pkg/blob/controllerserver_test.go index 0e2e89bcd..449ddeef1 100644 --- a/pkg/blob/controllerserver_test.go +++ b/pkg/blob/controllerserver_test.go @@ -755,12 +755,9 @@ func TestCreateVolume(t *testing.T) { defer ctrl.Finish() m := util.NewMockEXEC(ctrl) - listStr := "no error" - m.EXPECT().RunCommand(gomock.Any(), gomock.Any()).Return(listStr, nil) - d.azcopy.ExecCmd = m - expectedErr := status.Errorf(codes.InvalidArgument, "copy volume from volumeSnapshot is not supported") + expectedErr := status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented") _, err := d.CreateVolume(context.Background(), req) if !reflect.DeepEqual(err, expectedErr) { t.Errorf("Unexpected error: %v", err) @@ -821,9 +818,6 @@ func TestCreateVolume(t *testing.T) { defer ctrl.Finish() m := util.NewMockEXEC(ctrl) - listStr := "no error" - m.EXPECT().RunCommand(gomock.Any(), gomock.Any()).Return(listStr, nil) - d.azcopy.ExecCmd = m expectedErr := status.Errorf(codes.NotFound, "error parsing volume id: \"unit-test\", should at least contain two #") @@ -1440,132 +1434,38 @@ func TestDeleteBlobContainer(t *testing.T) { } func TestCopyVolume(t *testing.T) { - stdVolumeCapability := &csi.VolumeCapability{ - AccessType: &csi.VolumeCapability_Mount{ - Mount: &csi.VolumeCapability_MountVolume{}, - }, - } - stdVolumeCapabilities := []*csi.VolumeCapability{ - stdVolumeCapability, - } testCases := []struct { name string testFunc func(t *testing.T) }{ { - name: "copy volume from volumeSnapshot is not supported", + name: "src path is empty", testFunc: func(t *testing.T) { d := NewFakeDriver() - mp := map[string]string{} - - volumeSnapshotSource := &csi.VolumeContentSource_SnapshotSource{ - SnapshotId: "unit-test", - } - volumeContentSourceSnapshotSource := &csi.VolumeContentSource_Snapshot{ - Snapshot: volumeSnapshotSource, - } - volumecontensource := csi.VolumeContentSource{ - Type: volumeContentSourceSnapshotSource, - } - req := &csi.CreateVolumeRequest{ - Name: "unit-test", - VolumeCapabilities: stdVolumeCapabilities, - Parameters: mp, - VolumeContentSource: &volumecontensource, - } - - expectedErr := status.Errorf(codes.InvalidArgument, "copy volume from volumeSnapshot is not supported") - err := d.copyVolume(req, "", nil, "", "core.windows.net") + expectedErr := fmt.Errorf("srcPath() or dstPath(dstPath) or dstContainerName(dstContainer) is empty") + err := d.copyBlobContainer([]string{}, "", "", "dstPath", "", "dstContainer") if !reflect.DeepEqual(err, expectedErr) { t.Errorf("Unexpected error: %v", err) } }, }, { - name: "copy volume from volume not found", + name: "dst path is empty", testFunc: func(t *testing.T) { d := NewFakeDriver() - mp := map[string]string{} - - volumeSource := &csi.VolumeContentSource_VolumeSource{ - VolumeId: "unit-test", - } - volumeContentSourceVolumeSource := &csi.VolumeContentSource_Volume{ - Volume: volumeSource, - } - volumecontensource := csi.VolumeContentSource{ - Type: volumeContentSourceVolumeSource, - } - - req := &csi.CreateVolumeRequest{ - Name: "unit-test", - VolumeCapabilities: stdVolumeCapabilities, - Parameters: mp, - VolumeContentSource: &volumecontensource, - } - - expectedErr := status.Errorf(codes.NotFound, "error parsing volume id: \"unit-test\", should at least contain two #") - err := d.copyVolume(req, "", nil, "dstContainer", "core.windows.net") + expectedErr := fmt.Errorf("srcPath(srcPath) or dstPath() or dstContainerName(dstContainer) is empty") + err := d.copyBlobContainer([]string{}, "srcPath", "", "", "", "dstContainer") if !reflect.DeepEqual(err, expectedErr) { t.Errorf("Unexpected error: %v", err) } }, }, { - name: "src blob container is empty", + name: "dst container is empty", testFunc: func(t *testing.T) { d := NewFakeDriver() - mp := map[string]string{} - - volumeSource := &csi.VolumeContentSource_VolumeSource{ - VolumeId: "rg#unit-test##", - } - volumeContentSourceVolumeSource := &csi.VolumeContentSource_Volume{ - Volume: volumeSource, - } - volumecontensource := csi.VolumeContentSource{ - Type: volumeContentSourceVolumeSource, - } - - req := &csi.CreateVolumeRequest{ - Name: "unit-test", - VolumeCapabilities: stdVolumeCapabilities, - Parameters: mp, - VolumeContentSource: &volumecontensource, - } - - expectedErr := fmt.Errorf("srcContainerName() or dstContainerName(dstContainer) is empty") - err := d.copyVolume(req, "", nil, "dstContainer", "core.windows.net") - if !reflect.DeepEqual(err, expectedErr) { - t.Errorf("Unexpected error: %v", err) - } - }, - }, - { - name: "dst blob container is empty", - testFunc: func(t *testing.T) { - d := NewFakeDriver() - mp := map[string]string{} - - volumeSource := &csi.VolumeContentSource_VolumeSource{ - VolumeId: "vol_1#f5713de20cde511e8ba4900#fileshare#", - } - volumeContentSourceVolumeSource := &csi.VolumeContentSource_Volume{ - Volume: volumeSource, - } - volumecontensource := csi.VolumeContentSource{ - Type: volumeContentSourceVolumeSource, - } - - req := &csi.CreateVolumeRequest{ - Name: "unit-test", - VolumeCapabilities: stdVolumeCapabilities, - Parameters: mp, - VolumeContentSource: &volumecontensource, - } - - expectedErr := fmt.Errorf("srcContainerName(fileshare) or dstContainerName() is empty") - err := d.copyVolume(req, "", nil, "", "core.windows.net") + expectedErr := fmt.Errorf("srcPath(srcPath) or dstPath(dstPath) or dstContainerName() is empty") + err := d.copyBlobContainer([]string{}, "srcPath", "", "dstPath", "", "") if !reflect.DeepEqual(err, expectedErr) { t.Errorf("Unexpected error: %v", err) } @@ -1575,25 +1475,6 @@ func TestCopyVolume(t *testing.T) { name: "azcopy job is already completed", testFunc: func(t *testing.T) { d := NewFakeDriver() - mp := map[string]string{} - - volumeSource := &csi.VolumeContentSource_VolumeSource{ - VolumeId: "vol_1#f5713de20cde511e8ba4900#fileshare#", - } - volumeContentSourceVolumeSource := &csi.VolumeContentSource_Volume{ - Volume: volumeSource, - } - volumecontensource := csi.VolumeContentSource{ - Type: volumeContentSourceVolumeSource, - } - - req := &csi.CreateVolumeRequest{ - Name: "unit-test", - VolumeCapabilities: stdVolumeCapabilities, - Parameters: mp, - VolumeContentSource: &volumecontensource, - } - ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -1601,13 +1482,13 @@ func TestCopyVolume(t *testing.T) { listStr := "JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9\nStart Time: Monday, 07-Aug-23 03:29:54 UTC\nStatus: Completed\nCommand: copy https://{accountName}.file.core.windows.net/{srcFileshare}{SAStoken} https://{accountName}.file.core.windows.net/{dstFileshare}{SAStoken} --recursive --check-length=false" m.EXPECT().RunCommand(gomock.Eq("azcopy jobs list | grep dstContainer -B 3"), gomock.Any()).Return(listStr, nil) // if test.enableShow { - // m.EXPECT().RunCommand(gomock.Not("azcopy jobs list | grep dstContainer -B 3")).Return(test.showStr, test.showErr) + // m.EXPECT().RunCommand(gomock.Not("azcopy jobs list | grep dstContainer -B 3")).Return(test.showStr, test.showErr) // } d.azcopy.ExecCmd = m var expectedErr error - err := d.copyVolume(req, "sastoken", nil, "dstContainer", "core.windows.net") + err := d.copyBlobContainer([]string{}, "srcPath", "", "dstPath", "", "dstContainer") if !reflect.DeepEqual(err, expectedErr) { t.Errorf("Unexpected error: %v", err) } @@ -1617,25 +1498,6 @@ func TestCopyVolume(t *testing.T) { name: "azcopy job is first in progress and then be completed", testFunc: func(t *testing.T) { d := NewFakeDriver() - mp := map[string]string{} - - volumeSource := &csi.VolumeContentSource_VolumeSource{ - VolumeId: "vol_1#f5713de20cde511e8ba4900#fileshare#", - } - volumeContentSourceVolumeSource := &csi.VolumeContentSource_Volume{ - Volume: volumeSource, - } - volumecontensource := csi.VolumeContentSource{ - Type: volumeContentSourceVolumeSource, - } - - req := &csi.CreateVolumeRequest{ - Name: "unit-test", - VolumeCapabilities: stdVolumeCapabilities, - Parameters: mp, - VolumeContentSource: &volumecontensource, - } - ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -1649,8 +1511,8 @@ func TestCopyVolume(t *testing.T) { d.azcopy.ExecCmd = m - var expectedErr error - err := d.copyVolume(req, "sastoken", nil, "dstContainer", "core.windows.net") + expectedErr := fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is 50.0%%") + err := d.copyBlobContainer([]string{}, "srcPath", "", "dstPath", "", "dstContainer") if !reflect.DeepEqual(err, expectedErr) { t.Errorf("Unexpected error: %v", err) } From 9449337d4cef609ec22e899865a4ae790a6a1bd9 Mon Sep 17 00:00:00 2001 From: umagnus Date: Mon, 29 Jul 2024 08:21:17 +0000 Subject: [PATCH 2/5] fix volume cloning and add e2e --- pkg/blob/controllerserver.go | 2 +- test/e2e/dynamic_provisioning_test.go | 75 +++++++++++++++++++ ...cally_provisioned_volume_cloning_tester.go | 17 +++-- 3 files changed, 87 insertions(+), 7 deletions(-) diff --git a/pkg/blob/controllerserver.go b/pkg/blob/controllerserver.go index e9ca2b8de..6b8c7c46e 100644 --- a/pkg/blob/controllerserver.go +++ b/pkg/blob/controllerserver.go @@ -780,7 +780,7 @@ func (d *Driver) copyBlobContainer(authAzcopyEnv []string, srcPath string, srcAc case util.AzcopyJobNotFound: klog.V(2).Infof("copy blob container %s to %s", srcPath, dstContainerName) execFunc := func() error { - cmd := exec.Command("azcopy", "copy", srcPath+srcAccountSASToken, dstPath+dstAccountSASToken, "--recursive", "--check-length=false") + cmd := exec.Command("azcopy", "copy", srcPath+srcAccountSASToken, dstPath+dstAccountSASToken, "--recursive", "--check-length=false", "--s2s-preserve-access-tier=false") if len(authAzcopyEnv) > 0 { cmd.Env = append(os.Environ(), authAzcopyEnv...) } diff --git a/test/e2e/dynamic_provisioning_test.go b/test/e2e/dynamic_provisioning_test.go index b2479f6c4..177a9dd90 100644 --- a/test/e2e/dynamic_provisioning_test.go +++ b/test/e2e/dynamic_provisioning_test.go @@ -1001,4 +1001,79 @@ var _ = ginkgo.Describe("[blob-csi-e2e] Dynamic Provisioning", func() { } test.Run(ctx, cs, ns) }) + + ginkgo.It("should clone a volume from an existing NFSv3 volume to another storage class [nfs]", func(ctx ginkgo.SpecContext) { + pod := testsuites.PodDetails{ + Cmd: "echo 'hello world' > /mnt/test-1/data && grep 'hello world' /mnt/test-1/data", + Volumes: []testsuites.VolumeDetails{ + { + ClaimSize: "10Gi", + MountOptions: []string{ + "nconnect=8", + }, + VolumeMount: testsuites.VolumeMountDetails{ + NameGenerate: "test-volume-", + MountPathGenerate: "/mnt/test-", + }, + }, + }, + } + podWithClonedVolume := testsuites.PodDetails{ + Cmd: "grep 'hello world' /mnt/test-1/data", + } + test := testsuites.DynamicallyProvisionedVolumeCloningTest{ + CSIDriver: testDriver, + Pod: pod, + PodWithClonedVolume: podWithClonedVolume, + StorageClassParameters: map[string]string{ + "skuName": "Premium_LRS", + "protocol": "nfs", + "mountPermissions": "0755", + "allowsharedkeyaccess": "true", + }, + ClonedStorageClassParameters: map[string]string{ + "skuName": "Standard_LRS", + "protocol": "nfs", + "mountPermissions": "0755", + "allowsharedkeyaccess": "true", + }, + } + test.Run(ctx, cs, ns) + }) + + ginkgo.It("should clone a volume from an existing blobfuse2 volume to another storage class [fuse2]", func(ctx ginkgo.SpecContext) { + pod := testsuites.PodDetails{ + Cmd: "echo 'hello world' > /mnt/test-1/data && grep 'hello world' /mnt/test-1/data", + Volumes: []testsuites.VolumeDetails{ + { + ClaimSize: "10Gi", + MountOptions: []string{ + "-o allow_other", + "--virtual-directory=true", // blobfuse2 mount options + }, + VolumeMount: testsuites.VolumeMountDetails{ + NameGenerate: "test-volume-", + MountPathGenerate: "/mnt/test-", + }, + }, + }, + } + podWithClonedVolume := testsuites.PodDetails{ + Cmd: "grep 'hello world' /mnt/test-1/data", + } + test := testsuites.DynamicallyProvisionedVolumeCloningTest{ + CSIDriver: testDriver, + Pod: pod, + PodWithClonedVolume: podWithClonedVolume, + StorageClassParameters: map[string]string{ + "skuName": "Standard_LRS", + "protocol": "fuse2", + }, + ClonedStorageClassParameters: map[string]string{ + "skuName": "Premium_LRS", + "protocol": "fuse2", + }, + } + test.Run(ctx, cs, ns) + }) }) diff --git a/test/e2e/testsuites/dynamically_provisioned_volume_cloning_tester.go b/test/e2e/testsuites/dynamically_provisioned_volume_cloning_tester.go index 955926078..f16ac2a74 100644 --- a/test/e2e/testsuites/dynamically_provisioned_volume_cloning_tester.go +++ b/test/e2e/testsuites/dynamically_provisioned_volume_cloning_tester.go @@ -29,11 +29,12 @@ import ( // DynamicallyProvisionedVolumeCloningTest will provision required StorageClass(es), PVC(s) and Pod(s) // ClonedVolumeSize optional for when testing for cloned volume with different size to the original volume type DynamicallyProvisionedVolumeCloningTest struct { - CSIDriver driver.DynamicPVTestDriver - Pod PodDetails - PodWithClonedVolume PodDetails - ClonedVolumeSize string - StorageClassParameters map[string]string + CSIDriver driver.DynamicPVTestDriver + Pod PodDetails + PodWithClonedVolume PodDetails + ClonedVolumeSize string + StorageClassParameters map[string]string + ClonedStorageClassParameters map[string]string } func (t *DynamicallyProvisionedVolumeCloningTest) Run(ctx context.Context, client clientset.Interface, namespace *v1.Namespace) { @@ -69,7 +70,11 @@ func (t *DynamicallyProvisionedVolumeCloningTest) Run(ctx context.Context, clien } t.PodWithClonedVolume.Volumes = []VolumeDetails{clonedVolume} - tpod, cleanups = t.PodWithClonedVolume.SetupWithDynamicVolumes(ctx, client, namespace, t.CSIDriver, t.StorageClassParameters) + clonedStorageClassParameters := t.StorageClassParameters + if t.ClonedStorageClassParameters != nil { + clonedStorageClassParameters = t.ClonedStorageClassParameters + } + tpod, cleanups = t.PodWithClonedVolume.SetupWithDynamicVolumes(ctx, client, namespace, t.CSIDriver, clonedStorageClassParameters) for i := range cleanups { defer cleanups[i](ctx) } From 875ce98b472a193afb70e9973d354cb3f0571166 Mon Sep 17 00:00:00 2001 From: umagnus Date: Wed, 31 Jul 2024 08:17:46 +0000 Subject: [PATCH 3/5] fix --- pkg/blob/controllerserver_test.go | 5 +---- pkg/blob/volume_lock.go | 3 ++- pkg/util/util.go | 28 ++++++++++++++++++++++++++++ 3 files changed, 31 insertions(+), 5 deletions(-) diff --git a/pkg/blob/controllerserver_test.go b/pkg/blob/controllerserver_test.go index 449ddeef1..173d09290 100644 --- a/pkg/blob/controllerserver_test.go +++ b/pkg/blob/controllerserver_test.go @@ -1503,11 +1503,8 @@ func TestCopyVolume(t *testing.T) { m := util.NewMockEXEC(ctrl) listStr1 := "JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9\nStart Time: Monday, 07-Aug-23 03:29:54 UTC\nStatus: InProgress\nCommand: copy https://{accountName}.file.core.windows.net/{srcFileshare}{SAStoken} https://{accountName}.file.core.windows.net/{dstFileshare}{SAStoken} --recursive --check-length=false" - listStr2 := "JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9\nStart Time: Monday, 07-Aug-23 03:29:54 UTC\nStatus: Completed\nCommand: copy https://{accountName}.file.core.windows.net/{srcFileshare}{SAStoken} https://{accountName}.file.core.windows.net/{dstFileshare}{SAStoken} --recursive --check-length=false" - o1 := m.EXPECT().RunCommand(gomock.Eq("azcopy jobs list | grep dstContainer -B 3"), gomock.Any()).Return(listStr1, nil).Times(1) + m.EXPECT().RunCommand(gomock.Eq("azcopy jobs list | grep dstContainer -B 3"), gomock.Any()).Return(listStr1, nil).Times(1) m.EXPECT().RunCommand(gomock.Not("azcopy jobs list | grep dstBlobContainer -B 3"), gomock.Any()).Return("Percent Complete (approx): 50.0", nil) - o2 := m.EXPECT().RunCommand(gomock.Eq("azcopy jobs list | grep dstContainer -B 3"), gomock.Any()).Return(listStr2, nil) - gomock.InOrder(o1, o2) d.azcopy.ExecCmd = m diff --git a/pkg/blob/volume_lock.go b/pkg/blob/volume_lock.go index 2996d230d..95f779a9a 100644 --- a/pkg/blob/volume_lock.go +++ b/pkg/blob/volume_lock.go @@ -23,7 +23,8 @@ import ( ) const ( - volumeOperationAlreadyExistsFmt = "An operation with the given Volume ID %s already exists" + volumeOperationAlreadyExistsFmt = "An operation with the given Volume ID %s already exists" + volumeOperationAlreadyExistsWithAzcopyFmt = "An operation using azcopy with the given Volume ID %s already exists. Azcopy job status: %s, copy percent: %s%%, error: %v" ) // VolumeLocks implements a map with atomic operations. It stores a set of all volume IDs diff --git a/pkg/util/util.go b/pkg/util/util.go index c010db385..8349d36f4 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -23,6 +23,7 @@ import ( "regexp" "strings" "sync" + "time" "github.com/go-ini/ini" "github.com/pkg/errors" @@ -341,3 +342,30 @@ func GetKubeClient(kubeconfig string, kubeAPIQPS float64, kubeAPIBurst int, user kubeCfg.UserAgent = userAgent return kubernetes.NewForConfig(kubeCfg) } + +// ExecFunc returns a exec function's output and error +type ExecFunc func() (err error) + +// TimeoutFunc returns output and error if an ExecFunc timeout +type TimeoutFunc func() (err error) + +// WaitUntilTimeout waits for the exec function to complete or return timeout error +func WaitUntilTimeout(timeout time.Duration, execFunc ExecFunc, timeoutFunc TimeoutFunc) error { + // Create a channel to receive the result of the azcopy exec function + done := make(chan bool) + var err error + + // Start the azcopy exec function in a goroutine + go func() { + err = execFunc() + done <- true + }() + + // Wait for the function to complete or time out + select { + case <-done: + return err + case <-time.After(timeout): + return timeoutFunc() + } +} From 22cb998661e455b417ec393e56b9ba327c415435 Mon Sep 17 00:00:00 2001 From: umagnus Date: Thu, 1 Aug 2024 02:24:51 +0000 Subject: [PATCH 4/5] fix --- pkg/blob/controllerserver.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/blob/controllerserver.go b/pkg/blob/controllerserver.go index 6b8c7c46e..3589e1e2e 100644 --- a/pkg/blob/controllerserver.go +++ b/pkg/blob/controllerserver.go @@ -59,8 +59,6 @@ const ( MSI = "MSI" SPN = "SPN" authorizationPermissionMismatch = "AuthorizationPermissionMismatch" - - waitForAzCopyInterval = 2 * time.Second ) // CreateVolume provisions a volume From a3ad4c7101b05ebde4a1743dcb6ffdfef8b3f914 Mon Sep 17 00:00:00 2001 From: umagnus Date: Fri, 2 Aug 2024 07:52:36 +0000 Subject: [PATCH 5/5] add ut --- pkg/util/util_test.go | 52 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index 9956c2773..7843fabeb 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -600,3 +600,55 @@ users: } } } + +func TestWaitUntilTimeout(t *testing.T) { + tests := []struct { + desc string + timeout time.Duration + execFunc ExecFunc + timeoutFunc TimeoutFunc + expectedErr error + }{ + { + desc: "execFunc returns error", + timeout: 1 * time.Second, + execFunc: func() error { + return fmt.Errorf("execFunc error") + }, + timeoutFunc: func() error { + return fmt.Errorf("timeout error") + }, + expectedErr: fmt.Errorf("execFunc error"), + }, + { + desc: "execFunc timeout", + timeout: 1 * time.Second, + execFunc: func() error { + time.Sleep(2 * time.Second) + return nil + }, + timeoutFunc: func() error { + return fmt.Errorf("timeout error") + }, + expectedErr: fmt.Errorf("timeout error"), + }, + { + desc: "execFunc completed successfully", + timeout: 1 * time.Second, + execFunc: func() error { + return nil + }, + timeoutFunc: func() error { + return fmt.Errorf("timeout error") + }, + expectedErr: nil, + }, + } + + for _, test := range tests { + err := WaitUntilTimeout(test.timeout, test.execFunc, test.timeoutFunc) + if err != nil && (err.Error() != test.expectedErr.Error()) { + t.Errorf("unexpected error: %v, expected error: %v", err, test.expectedErr) + } + } +}