Skip to content

Commit

Permalink
Merge pull request #1520 from umagnus/release-1.23-fix_volume_cloning
Browse files Browse the repository at this point in the history
[release-1.23] fix: clone volume content to requested volume
  • Loading branch information
andyzhangx authored Aug 2, 2024
2 parents 5a136f7 + a3ad4c7 commit dc5cf2f
Show file tree
Hide file tree
Showing 7 changed files with 286 additions and 255 deletions.
195 changes: 103 additions & 92 deletions pkg/blob/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ const (
MSI = "MSI"
SPN = "SPN"
authorizationPermissionMismatch = "AuthorizationPermissionMismatch"

waitForAzCopyInterval = 2 * time.Second
)

// CreateVolume provisions a volume
Expand All @@ -79,19 +77,12 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
return nil, status.Error(codes.InvalidArgument, err.Error())
}

if acquired := d.volumeLocks.TryAcquire(volName); !acquired {
// logging the job status if it's volume cloning
if req.GetVolumeContentSource() != nil {
jobState, percent, err := d.azcopy.GetAzcopyJob(volName, []string{})
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
}
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName)
}
defer d.volumeLocks.Release(volName)

volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes())
requestGiB := int(util.RoundUpGiB(volSizeBytes))

volContentSource := req.GetVolumeContentSource()
secrets := req.GetSecrets()

parameters := req.GetParameters()
if parameters == nil {
parameters = make(map[string]string)
Expand Down Expand Up @@ -341,10 +332,31 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
GetLatestAccountKey: getLatestAccountKey,
}

containerName = replaceWithMap(containerName, containerNameReplaceMap)
validContainerName := containerName
if validContainerName == "" {
validContainerName = volName
if containerNamePrefix != "" {
validContainerName = containerNamePrefix + "-" + volName
}
validContainerName = getValidContainerName(validContainerName, protocol)
setKeyValueInMap(parameters, containerNameField, validContainerName)
}

if acquired := d.volumeLocks.TryAcquire(volName); !acquired {
// logging the job status if it's volume cloning
if volContentSource != nil {
jobState, percent, err := d.azcopy.GetAzcopyJob(validContainerName, []string{})
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsWithAzcopyFmt, volName, jobState, percent, err)
}
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName)
}
defer d.volumeLocks.Release(volName)

var volumeID string
requestName := "controller_create_volume"
if req.GetVolumeContentSource() != nil {
switch req.VolumeContentSource.Type.(type) {
if volContentSource != nil {
switch volContentSource.Type.(type) {
case *csi.VolumeContentSource_Snapshot:
requestName = "controller_create_volume_from_snapshot"
case *csi.VolumeContentSource_Volume:
Expand All @@ -357,9 +369,39 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
}()

var srcAzcopyAuthEnv []string
var srcSubscriptionID, srcResourceGroupName, srcAccountName, srcContainerName, srcPath, srcAccountSASToken string
if volContentSource != nil {
switch volContentSource.Type.(type) {
case *csi.VolumeContentSource_Snapshot:
return nil, status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented")
case *csi.VolumeContentSource_Volume:
var srcVolumeID string
if volContentSource.GetVolume() != nil {
srcVolumeID = volContentSource.GetVolume().GetVolumeId()
}
srcResourceGroupName, srcAccountName, srcContainerName, _, srcSubscriptionID, err = GetContainerInfo(srcVolumeID)
if err != nil {
return nil, status.Error(codes.NotFound, err.Error())
}
srcAccountOptions := &azure.AccountOptions{
Name: srcAccountName,
SubscriptionID: srcSubscriptionID,
ResourceGroup: srcResourceGroupName,
GetLatestAccountKey: getLatestAccountKey,
}
srcAccountSASToken, srcAzcopyAuthEnv, err = d.getAzcopyAuth(ctx, srcAccountName, "", storageEndpointSuffix, srcAccountOptions, secrets, secretName, secretNamespace)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
}
srcPath = fmt.Sprintf("https://%s.blob.%s/%s", srcAccountName, storageEndpointSuffix, srcContainerName)
default:
return nil, status.Errorf(codes.InvalidArgument, "VolumeContentSource is not recognized: %v", volContentSource)
}
}

var accountKey string
accountName := account
secrets := req.GetSecrets()
if len(secrets) == 0 && accountName == "" {
if v, ok := d.volMap.Load(volName); ok {
accountName = v.(string)
Expand Down Expand Up @@ -413,31 +455,26 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
secrets = createStorageAccountSecret(accountName, accountKey)
}

// replace pv/pvc name namespace metadata in subDir
containerName = replaceWithMap(containerName, containerNameReplaceMap)
validContainerName := containerName
if validContainerName == "" {
validContainerName = volName
if containerNamePrefix != "" {
validContainerName = containerNamePrefix + "-" + volName
dstAzcopyAuthEnv := srcAzcopyAuthEnv
dstAccountSASToken := srcAccountSASToken
if volContentSource != nil {
if srcSubscriptionID != subsID || srcResourceGroupName != resourceGroup || srcAccountName != accountName {
if dstAccountSASToken, dstAzcopyAuthEnv, err = d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace); err != nil {
return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
}
}
validContainerName = getValidContainerName(validContainerName, protocol)
setKeyValueInMap(parameters, containerNameField, validContainerName)
}

if req.GetVolumeContentSource() != nil {
accountSASToken, authAzcopyEnv, err := d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
}
if err := d.copyVolume(req, accountSASToken, authAzcopyEnv, validContainerName, storageEndpointSuffix); err != nil {
klog.V(2).Infof("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)", validContainerName, accountName, storageAccountType, subsID, resourceGroup, location, requestGiB)
if err := d.CreateBlobContainer(ctx, subsID, resourceGroup, accountName, validContainerName, secrets); err != nil {
return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err)
}

if volContentSource != nil {
dstPath := fmt.Sprintf("https://%s.blob.%s/%s", accountName, storageEndpointSuffix, validContainerName)
if err := d.copyBlobContainer(dstAzcopyAuthEnv, srcPath, srcAccountSASToken, dstPath, dstAccountSASToken, validContainerName); err != nil {
return nil, err
}
} else {
klog.V(2).Infof("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)", validContainerName, accountName, storageAccountType, subsID, resourceGroup, location, requestGiB)
if err := d.CreateBlobContainer(ctx, subsID, resourceGroup, accountName, validContainerName, secrets); err != nil {
return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err)
}
}

if storeAccountKey && len(req.GetSecrets()) == 0 {
Expand Down Expand Up @@ -478,7 +515,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
VolumeId: volumeID,
CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
VolumeContext: parameters,
ContentSource: req.GetVolumeContentSource(),
ContentSource: volContentSource,
},
}, nil
}
Expand Down Expand Up @@ -724,71 +761,45 @@ func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupN
})
}

// CopyBlobContainer copies a blob container in the same storage account
func (d *Driver) copyBlobContainer(req *csi.CreateVolumeRequest, accountSasToken string, authAzcopyEnv []string, dstContainerName, storageEndpointSuffix string) error {
var sourceVolumeID string
if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil {
sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId()
// copyBlobContainer copies source volume content into a destination volume
func (d *Driver) copyBlobContainer(authAzcopyEnv []string, srcPath string, srcAccountSASToken string, dstPath string, dstAccountSASToken string, dstContainerName string) error {

}
resourceGroupName, accountName, srcContainerName, _, _, err := GetContainerInfo(sourceVolumeID) //nolint:dogsled
if err != nil {
return status.Error(codes.NotFound, err.Error())
}
if srcContainerName == "" || dstContainerName == "" {
return fmt.Errorf("srcContainerName(%s) or dstContainerName(%s) is empty", srcContainerName, dstContainerName)
if srcPath == "" || dstPath == "" || dstContainerName == "" {
return fmt.Errorf("srcPath(%s) or dstPath(%s) or dstContainerName(%s) is empty", srcPath, dstPath, dstContainerName)
}

timeAfter := time.After(time.Duration(d.waitForAzCopyTimeoutMinutes) * time.Minute)
timeTick := time.Tick(waitForAzCopyInterval)
srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, srcContainerName, accountSasToken)
dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, dstContainerName, accountSasToken)

jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
if jobState == util.AzcopyJobError || jobState == util.AzcopyJobCompleted {
switch jobState {
case util.AzcopyJobError, util.AzcopyJobCompleted:
return err
}
klog.V(2).Infof("begin to copy blob container %s to %s", srcContainerName, dstContainerName)
for {
select {
case <-timeTick:
jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
switch jobState {
case util.AzcopyJobError, util.AzcopyJobCompleted:
return err
case util.AzcopyJobNotFound:
klog.V(2).Infof("copy blob container %s to %s", srcContainerName, dstContainerName)
cmd := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false")
if len(authAzcopyEnv) > 0 {
cmd.Env = append(os.Environ(), authAzcopyEnv...)
}
out, copyErr := cmd.CombinedOutput()
if copyErr != nil {
klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error(%v): %v", resourceGroupName, accountName, dstPath, copyErr, string(out))
} else {
klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName)
}
return copyErr
case util.AzcopyJobRunning:
return fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is %s%%", percent)
case util.AzcopyJobNotFound:
klog.V(2).Infof("copy blob container %s to %s", srcPath, dstContainerName)
execFunc := func() error {
cmd := exec.Command("azcopy", "copy", srcPath+srcAccountSASToken, dstPath+dstAccountSASToken, "--recursive", "--check-length=false", "--s2s-preserve-access-tier=false")
if len(authAzcopyEnv) > 0 {
cmd.Env = append(os.Environ(), authAzcopyEnv...)
}
if out, err := cmd.CombinedOutput(); err != nil {
return fmt.Errorf("exec error: %v, output: %v", err, string(out))
}
case <-timeAfter:
return fmt.Errorf("timeout waiting for copy blob container %s to %s succeed", srcContainerName, dstContainerName)
return nil
}
timeoutFunc := func() error {
_, percent, _ := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
return fmt.Errorf("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%", srcPath, dstContainerName, percent)
}
copyErr := util.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFunc, timeoutFunc)
if copyErr != nil {
klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", srcPath, dstPath, dstContainerName, copyErr)
} else {
klog.V(2).Infof("copied blob container %s to %s successfully", srcPath, dstContainerName)
}
return copyErr
}
}

// copyVolume copies a volume form volume or snapshot, snapshot is not supported now
func (d *Driver) copyVolume(req *csi.CreateVolumeRequest, accountSASToken string, authAzcopyEnv []string, dstContainerName, storageEndpointSuffix string) error {
vs := req.VolumeContentSource
switch vs.Type.(type) {
case *csi.VolumeContentSource_Snapshot:
return status.Errorf(codes.InvalidArgument, "copy volume from volumeSnapshot is not supported")
case *csi.VolumeContentSource_Volume:
return d.copyBlobContainer(req, accountSASToken, authAzcopyEnv, dstContainerName, storageEndpointSuffix)
default:
return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs)
}
return err
}

// authorizeAzcopyWithIdentity returns auth env for azcopy using cluster identity
Expand Down
Loading

0 comments on commit dc5cf2f

Please sign in to comment.