Skip to content

Commit

Permalink
azure: Migrate vfs to new SDK version
Browse files Browse the repository at this point in the history
  • Loading branch information
hakman committed Feb 9, 2024
1 parent a1242ef commit cf9bd3d
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 202 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ UPLOAD_CMD=$(KOPS_ROOT)/hack/upload ${UPLOAD_ARGS}
unexport AWS_ACCESS_KEY_ID AWS_REGION AWS_SECRET_ACCESS_KEY AWS_SESSION_TOKEN CNI_VERSION_URL DNS_IGNORE_NS_CHECK DNSCONTROLLER_IMAGE DO_ACCESS_TOKEN GOOGLE_APPLICATION_CREDENTIALS
unexport KOPS_BASE_URL KOPS_CLUSTER_NAME KOPS_RUN_OBSOLETE_VERSION KOPS_STATE_STORE KOPS_STATE_S3_ACL KUBE_API_VERSIONS NODEUP_URL OPENSTACK_CREDENTIAL_FILE SKIP_PACKAGE_UPDATE
unexport SKIP_REGION_CHECK S3_ACCESS_KEY_ID S3_ENDPOINT S3_REGION S3_SECRET_ACCESS_KEY HCLOUD_TOKEN SCW_ACCESS_KEY SCW_SECRET_KEY SCW_DEFAULT_PROJECT_ID SCW_PROFILE
unexport AZURE_CLIENT_ID AZURE_CLIENT_SECRET AZURE_STORAGE_ACCOUNT AZURE_STORAGE_KEY AZURE_SUBSCRIPTION_ID AZURE_TENANT_ID
unexport AZURE_CLIENT_ID AZURE_CLIENT_SECRET AZURE_STORAGE_ACCOUNT AZURE_SUBSCRIPTION_ID AZURE_TENANT_ID


VERSION=$(shell tools/get_version.sh | grep VERSION | awk '{print $$2}')
Expand Down
22 changes: 0 additions & 22 deletions docs/getting_started/azure.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,28 +91,6 @@ Set the env var `AZURE_STORAGE_ACCOUNT` to the storage account name for later us
$ export AZURE_STORAGE_ACCOUNT=kopstest
```

Get an access key of the account and set it in env var `AZURE_STORAGE_KEY` for later use.

```bash
$ az storage account keys list --account-name kopstest
[
{
"keyName": "key1",
"permissions": "Full",
"value": "RHWWn..."
},
{
"keyName": "key2",
"permissions": "Full",
"value": "..."
}

]

$ export AZURE_STORAGE_KEY="RHWWn...“
```
Then create a blob container.

```bash
Expand Down
2 changes: 1 addition & 1 deletion hack/update-expected.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ unset AWS_ACCESS_KEY_ID AWS_REGION AWS_SECRET_ACCESS_KEY AWS_SESSION_TOKEN CNI_V
unset KOPS_CLUSTER_NAME KOPS_RUN_OBSOLETE_VERSION KOPS_STATE_STORE KOPS_STATE_S3_ACL KUBE_API_VERSIONS NODEUP_URL OPENSTACK_CREDENTIAL_FILE PROTOKUBE_IMAGE SKIP_PACKAGE_UPDATE
unset SKIP_REGION_CHECK S3_ACCESS_KEY_ID S3_ENDPOINT S3_REGION S3_SECRET_ACCESS_KEY
unset SCW_ACCESS_KEY SCW_SECRET_KEY SCW_DEFAULT_PROJECT_ID SCW_PROFILE
unset AZURE_CLIENT_ID AZURE_CLIENT_SECRET AZURE_STORAGE_ACCOUNT AZURE_STORAGE_KEY AZURE_SUBSCRIPTION_ID AZURE_TENANT_ID
unset AZURE_CLIENT_ID AZURE_CLIENT_SECRET AZURE_STORAGE_ACCOUNT AZURE_SUBSCRIPTION_ID AZURE_TENANT_ID
unset DIGITALOCEAN_ACCESS_TOKEN

# Run the tests in "autofix mode"
Expand Down
201 changes: 68 additions & 133 deletions util/pkg/vfs/azureblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ import (
"strings"
"sync"

"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
"k8s.io/klog/v2"
"k8s.io/kops/util/pkg/hashing"
)

Expand Down Expand Up @@ -100,53 +103,44 @@ func (p *AzureBlobPath) Join(relativePath ...string) Path {

// ReadFile returns the content of the blob.
func (p *AzureBlobPath) ReadFile(ctx context.Context) ([]byte, error) {
var b bytes.Buffer
_, err := p.WriteTo(&b)
if err != nil {
return nil, err
}
return b.Bytes(), nil
}

// WriteTo writes the content of the blob to the writer.
func (p *AzureBlobPath) WriteTo(w io.Writer) (n int64, err error) {
ctx := context.TODO()
klog.V(8).Infof("Reading file: %s - %s", p.container, p.key)

client, err := p.getClient(ctx)
if err != nil {
return 0, err
return nil, err
}

cURL, err := client.newContainerURL(p.container)
get, err := client.DownloadStream(ctx, p.container, p.key, nil)
if err != nil {
return 0, err
if bloberror.HasCode(err, bloberror.ContainerNotFound) || bloberror.HasCode(err, bloberror.BlobNotFound) {
return nil, os.ErrNotExist
}
return nil, err
}
resp, err := cURL.NewBlockBlobURL(p.key).Download(
ctx,
0, /* offset */
azblob.CountToEnd,
azblob.BlobAccessConditions{},
false, /* rangeGetContentMD5 */
azblob.ClientProvidedKeyOptions{},
)

b := &bytes.Buffer{}
retryReader := get.NewRetryReader(ctx, &azblob.RetryReaderOptions{})
_, err = b.ReadFrom(retryReader)
if err != nil {
serr, ok := err.(azblob.StorageError)
if ok && serr.ServiceCode() == azblob.ServiceCodeBlobNotFound {
return 0, os.ErrNotExist
}
return 0, err
return nil, err
}
return io.Copy(w, resp.Body(azblob.RetryReaderOptions{MaxRetryRequests: 10}))

return b.Bytes(), nil
}

// WriteTo writes the content of the blob to the writer.
func (p *AzureBlobPath) WriteTo(w io.Writer) (n int64, err error) {
return 0, fmt.Errorf("not implemented")
}

// createFileLockAzureBLob prevents concurrent creates on the same
// file while maintaining atomicity of writes.
//
// This takes the same approach as S3Path.
var createFileLockAzureBlob sync.Mutex

// CreateFile writes the file contents only if the file does not already exist.
func (p *AzureBlobPath) CreateFile(ctx context.Context, data io.ReadSeeker, acl ACL) error {
klog.V(8).Infof("Creating file: %s - %s", p.container, p.key)

createFileLockAzureBlob.Lock()
defer createFileLockAzureBlob.Unlock()

Expand All @@ -162,62 +156,43 @@ func (p *AzureBlobPath) CreateFile(ctx context.Context, data io.ReadSeeker, acl
}

// WriteFile writes the blob to the reader.
//
// TODO(kenji): Support ACL.
func (p *AzureBlobPath) WriteFile(ctx context.Context, data io.ReadSeeker, acl ACL) error {
klog.V(8).Infof("Writing file: %s - %s", p.container, p.key)

client, err := p.getClient(ctx)
if err != nil {
return err
}

md5Hash, err := hashing.HashAlgorithmMD5.Hash(data)
if err != nil {
_, err = client.CreateContainer(ctx, p.container, nil)
if err != nil && !bloberror.HasCode(err, bloberror.ContainerAlreadyExists) {
return err
}
if _, err := data.Seek(0, 0); err != nil {
return fmt.Errorf("error seeking to start of data stream: %v", err)
}

cURL, err := client.newContainerURL(p.container)
if err != nil {
return err
}
// Use block blob. Other options are page blobs (optimized for
// random read/write) and append blob (optimized for append).
_, err = cURL.NewBlockBlobURL(p.key).Upload(
ctx,
data,
azblob.BlobHTTPHeaders{
ContentType: "application/octet-stream",
ContentMD5: md5Hash.HashValue,
},
azblob.Metadata{},
azblob.BlobAccessConditions{},
azblob.AccessTierNone,
azblob.BlobTagsMap{},
azblob.ClientProvidedKeyOptions{},
azblob.ImmutabilityPolicyOptions{},
)
_, err = client.UploadStream(ctx, p.container, p.key, data, nil)
return err
}

// Remove deletes the blob.
func (p *AzureBlobPath) Remove(ctx context.Context) error {
klog.V(8).Infof("Removing file: %q - %q", p.container, p.key)

client, err := p.getClient(ctx)
if err != nil {
return err
}

cURL, err := client.newContainerURL(p.container)
_, err = client.DeleteBlob(ctx, p.container, p.key, nil)
if err != nil {
return err
}
// Delete the blob, but keep its snapshot.
_, err = cURL.NewBlockBlobURL(p.key).Delete(ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{})
return err

return nil
}

func (p *AzureBlobPath) RemoveAll(ctx context.Context) error {
klog.V(8).Infof("Removing ALL files: %s - %s", p.container, p.key)

tree, err := p.ReadTree(ctx)
if err != nil {
return err
Expand All @@ -226,124 +201,84 @@ func (p *AzureBlobPath) RemoveAll(ctx context.Context) error {
for _, blobPath := range tree {
err := blobPath.Remove(ctx)
if err != nil {
return fmt.Errorf("error removing file %s: %w", blobPath, err)
return fmt.Errorf("removing file %s: %w", blobPath, err)
}
}

return nil
}

func (p *AzureBlobPath) RemoveAllVersions(ctx context.Context) error {
client, err := p.getClient(ctx)
klog.V(8).Infof("Removing ALL file versions: %s - %s", p.container, p.key)

tree, err := p.ReadTree(ctx)
if err != nil {
return err
}

cURL, err := client.newContainerURL(p.container)
if err != nil {
return err
for _, blobPath := range tree {
err := blobPath.Remove(ctx)
if err != nil {
return fmt.Errorf("removing file %s: %w", blobPath, err)
}
}
// Delete the blob and its snapshot.
_, err = cURL.NewBlockBlobURL(p.key).Delete(ctx, azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{})
return err

return nil
}

// ReadDir lists the blobs under the current Path.
func (p *AzureBlobPath) ReadDir() ([]Path, error) {
klog.V(8).Infof("Reading dir: %s - %s", p.container, p.key)

ctx := context.TODO()

client, err := p.getClient(ctx)
tree, err := p.ReadTree(ctx)
if err != nil {
return nil, err
}

var paths []Path
cURL, err := client.newContainerURL(p.container)
if err != nil {
return nil, err
}

prefix := p.key
if !strings.HasSuffix(prefix, "/") {
prefix += "/"
}

for m := (azblob.Marker{}); m.NotDone(); {
// List all blobs that have the same prefix (without
// recursion). By specifying "/", the request will
// group blobs with their names up to the appearance of "/".
//
// Suppose that we have the following blobs:
//
// - cluster/cluster.spec
// - cluster/config
// - cluster/instancegroup/master-eastus-1
//
// When the prefix is set to "cluster/", the request
// returns "cluster/cluster.spec" and "cluster/config" in BlobItems
// and returns "cluster/instancegroup/" in BlobPrefixes.
resp, err := cURL.ListBlobsHierarchySegment(
ctx,
m,
"/", /* delimiter */
azblob.ListBlobsSegmentOptions{Prefix: prefix})
if err != nil {
return nil, nil
for _, blob := range tree {
if p.Join(blob.Base()).Path() == blob.Path() {
klog.V(8).Infof("Found file: %q", blob.Path())
paths = append(paths, blob)
}
for _, item := range resp.Segment.BlobItems {
paths = append(paths, &AzureBlobPath{
vfsContext: p.vfsContext,
container: p.container,
key: item.Name,
md5Hash: string(item.Properties.ContentMD5),
})
}
for _, prefix := range resp.Segment.BlobPrefixes {
paths = append(paths, &AzureBlobPath{
vfsContext: p.vfsContext,
container: p.container,
key: prefix.Name,
})
}

m = resp.NextMarker

}

return paths, nil
}

// ReadTree lists all blobs (recursively) in the subtree rooted at the current Path.
func (p *AzureBlobPath) ReadTree(ctx context.Context) ([]Path, error) {
klog.V(8).Infof("Reading tree: %s - %s", p.container, p.key)

client, err := p.getClient(ctx)
if err != nil {
return nil, err
}

var paths []Path
cURL, err := client.newContainerURL(p.container)
if err != nil {
return nil, err
}
for m := (azblob.Marker{}); m.NotDone(); {
resp, err := cURL.ListBlobsFlatSegment(ctx, m, azblob.ListBlobsSegmentOptions{Prefix: p.key})
pager := client.NewListBlobsFlatPager(p.container, &azblob.ListBlobsFlatOptions{
Prefix: to.Ptr(p.key),
})
for pager.More() {
resp, err := pager.NextPage(ctx)
if err != nil {
return nil, nil
return nil, err
}
for _, item := range resp.Segment.BlobItems {
for _, blob := range resp.Segment.BlobItems {
paths = append(paths, &AzureBlobPath{
vfsContext: p.vfsContext,
container: p.container,
key: item.Name,
md5Hash: string(item.Properties.ContentMD5),
key: *blob.Name,
})
}
m = resp.NextMarker

}

return paths, nil
}

// getClient returns the client for azure blob storage.
func (p *AzureBlobPath) getClient(ctx context.Context) (*azureClient, error) {
func (p *AzureBlobPath) getClient(ctx context.Context) (*azblob.Client, error) {
return p.vfsContext.getAzureBlobClient(ctx)
}
Loading

0 comments on commit cf9bd3d

Please sign in to comment.