Skip to content

Commit

Permalink
fix: separate srcAccount and dstAccount for copyBlobContainer
Browse files Browse the repository at this point in the history
  • Loading branch information
acortelyou committed Jul 19, 2024
1 parent fda1e78 commit 34ff7a7
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 22 deletions.
29 changes: 15 additions & 14 deletions pkg/blob/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
}
if err := d.copyVolume(req, accountSASToken, authAzcopyEnv, validContainerName, storageEndpointSuffix); err != nil {
if err := d.copyVolume(req, accountSASToken, authAzcopyEnv, accountName, validContainerName, storageEndpointSuffix); err != nil {
return nil, err
}
} else {
Expand Down Expand Up @@ -755,23 +755,24 @@ func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupN
})
}

// CopyBlobContainer copies a blob container in the same storage account
func (d *Driver) copyBlobContainer(req *csi.CreateVolumeRequest, accountSasToken string, authAzcopyEnv []string, dstContainerName, storageEndpointSuffix string) error {
// CopyBlobContainer copies a blob container
func (d *Driver) copyBlobContainer(req *csi.CreateVolumeRequest, accountSasToken string, authAzcopyEnv []string, dstAccountName, dstContainerName, storageEndpointSuffix string) error {
var sourceVolumeID string
if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil {
sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId()

}
resourceGroupName, accountName, srcContainerName, _, _, err := GetContainerInfo(sourceVolumeID) //nolint:dogsled
_, srcAccountName, srcContainerName, _, _, err := GetContainerInfo(sourceVolumeID) //nolint:dogsled
if err != nil {
return status.Error(codes.NotFound, err.Error())
}
if srcContainerName == "" || dstContainerName == "" {
return fmt.Errorf("srcContainerName(%s) or dstContainerName(%s) is empty", srcContainerName, dstContainerName)
if srcAccountName == "" || srcContainerName == "" || dstAccountName == "" || dstContainerName == "" {
return fmt.Errorf("One or more of srcAccountName(%s), srcContainerName(%s), dstAccountName(%s), dstContainerName(%s) are empty", srcAccountName, srcContainerName, dstAccountName, dstContainerName)
}

srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, srcContainerName, accountSasToken)
dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, dstContainerName, accountSasToken)
srcPath := fmt.Sprintf("https://%s.blob.%s/%s", srcAccountName, storageEndpointSuffix, srcContainerName)
dstPath := fmt.Sprintf("https://%s.blob.%s/%s", dstAccountName, storageEndpointSuffix, dstContainerName)


jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
Expand All @@ -781,9 +782,9 @@ func (d *Driver) copyBlobContainer(req *csi.CreateVolumeRequest, accountSasToken
case util.AzcopyJobRunning:
return fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is %s%%", percent)
case util.AzcopyJobNotFound:
klog.V(2).Infof("copy blob container %s to %s", srcContainerName, dstContainerName)
klog.V(2).Infof("copy blob container %s to %s", srcPath, dstPath)
execFunc := func() error {
cmd := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false")
cmd := exec.Command("azcopy", "copy", srcPath + accountSasToken, dstPath + accountSasToken, "--recursive", "--check-length=false")
if len(authAzcopyEnv) > 0 {
cmd.Env = append(os.Environ(), authAzcopyEnv...)
}
Expand All @@ -794,11 +795,11 @@ func (d *Driver) copyBlobContainer(req *csi.CreateVolumeRequest, accountSasToken
}
timeoutFunc := func() error {
_, percent, _ := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
return fmt.Errorf("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%", srcContainerName, dstContainerName, percent)
return fmt.Errorf("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%", srcPath, dstPath, percent)
}
copyErr := util.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFunc, timeoutFunc)
if copyErr != nil {
klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", resourceGroupName, accountName, dstPath, copyErr)
klog.Warningf("CopyBlobContainer(%s, %s, %s, %s) failed with error: %v", srcAccountName, srcContainerName, dstAccountName, dstContainerName, copyErr)
} else {
klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName)
}
Expand All @@ -808,13 +809,13 @@ func (d *Driver) copyBlobContainer(req *csi.CreateVolumeRequest, accountSasToken
}

// copyVolume copies a volume form volume or snapshot, snapshot is not supported now
func (d *Driver) copyVolume(req *csi.CreateVolumeRequest, accountSASToken string, authAzcopyEnv []string, dstContainerName, storageEndpointSuffix string) error {
func (d *Driver) copyVolume(req *csi.CreateVolumeRequest, accountSASToken string, authAzcopyEnv []string, dstAccountName, dstContainerName, storageEndpointSuffix string) error {
vs := req.VolumeContentSource
switch vs.Type.(type) {
case *csi.VolumeContentSource_Snapshot:
return status.Errorf(codes.InvalidArgument, "copy volume from volumeSnapshot is not supported")
case *csi.VolumeContentSource_Volume:
return d.copyBlobContainer(req, accountSASToken, authAzcopyEnv, dstContainerName, storageEndpointSuffix)
return d.copyBlobContainer(req, accountSASToken, authAzcopyEnv, dstAccountName, dstContainerName, storageEndpointSuffix)
default:
return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs)
}
Expand Down
46 changes: 38 additions & 8 deletions pkg/blob/controllerserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1566,7 +1566,7 @@ func TestCopyVolume(t *testing.T) {
}

expectedErr := status.Errorf(codes.InvalidArgument, "copy volume from volumeSnapshot is not supported")
err := d.copyVolume(req, "", nil, "", "core.windows.net")
err := d.copyVolume(req, "", nil, "", "", "core.windows.net")
if !reflect.DeepEqual(err, expectedErr) {
t.Errorf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -1596,7 +1596,37 @@ func TestCopyVolume(t *testing.T) {
}

expectedErr := status.Errorf(codes.NotFound, "error parsing volume id: \"unit-test\", should at least contain two #")
err := d.copyVolume(req, "", nil, "dstContainer", "core.windows.net")
err := d.copyVolume(req, "", nil, "dstAccount", "dstContainer", "core.windows.net")
if !reflect.DeepEqual(err, expectedErr) {
t.Errorf("Unexpected error: %v", err)
}
},
},
{
name: "src account and dst account are different",
testFunc: func(t *testing.T) {
d := NewFakeDriver()
mp := map[string]string{}

volumeSource := &csi.VolumeContentSource_VolumeSource{
VolumeId: "rg#srcAccount##",
}
volumeContentSourceVolumeSource := &csi.VolumeContentSource_Volume{
Volume: volumeSource,
}
volumecontensource := csi.VolumeContentSource{
Type: volumeContentSourceVolumeSource,
}

req := &csi.CreateVolumeRequest{
Name: "unit-test",
VolumeCapabilities: stdVolumeCapabilities,
Parameters: mp,
VolumeContentSource: &volumecontensource,
}

expectedErr := fmt.Errorf("One or more of srcAccountName(srcAccount), srcContainerName(), dstAccountName(dstAccount), dstContainerName(dstContainer) are empty")
err := d.copyVolume(req, "", nil, "dstAccount", "dstContainer", "core.windows.net")
if !reflect.DeepEqual(err, expectedErr) {
t.Errorf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -1625,8 +1655,8 @@ func TestCopyVolume(t *testing.T) {
VolumeContentSource: &volumecontensource,
}

expectedErr := fmt.Errorf("srcContainerName() or dstContainerName(dstContainer) is empty")
err := d.copyVolume(req, "", nil, "dstContainer", "core.windows.net")
expectedErr := fmt.Errorf("One or more of srcAccountName(unit-test), srcContainerName(), dstAccountName(dstAccount), dstContainerName(dstContainer) are empty")
err := d.copyVolume(req, "", nil, "dstAccount", "dstContainer", "core.windows.net")
if !reflect.DeepEqual(err, expectedErr) {
t.Errorf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -1655,8 +1685,8 @@ func TestCopyVolume(t *testing.T) {
VolumeContentSource: &volumecontensource,
}

expectedErr := fmt.Errorf("srcContainerName(fileshare) or dstContainerName() is empty")
err := d.copyVolume(req, "", nil, "", "core.windows.net")
expectedErr := fmt.Errorf("One or more of srcAccountName(f5713de20cde511e8ba4900), srcContainerName(fileshare), dstAccountName(), dstContainerName() are empty")
err := d.copyVolume(req, "", nil, "", "", "core.windows.net")
if !reflect.DeepEqual(err, expectedErr) {
t.Errorf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -1698,7 +1728,7 @@ func TestCopyVolume(t *testing.T) {
d.azcopy.ExecCmd = m

var expectedErr error
err := d.copyVolume(req, "sastoken", nil, "dstContainer", "core.windows.net")
err := d.copyVolume(req, "sastoken", nil, "dstAccount", "dstContainer", "core.windows.net")
if !reflect.DeepEqual(err, expectedErr) {
t.Errorf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -1738,7 +1768,7 @@ func TestCopyVolume(t *testing.T) {
d.azcopy.ExecCmd = m

expectedErr := fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is 50.0%%")
err := d.copyVolume(req, "sastoken", nil, "dstContainer", "core.windows.net")
err := d.copyVolume(req, "sastoken", nil, "dstAccount", "dstContainer", "core.windows.net")
if !reflect.DeepEqual(err, expectedErr) {
t.Errorf("Unexpected error: %v", err)
}
Expand Down

0 comments on commit 34ff7a7

Please sign in to comment.