Skip to content

Commit

Permalink
Writerat (#223)
Browse files Browse the repository at this point in the history
* Add illumos build tag additionally to solaris

* #7508079 [Go][Blob][2019-12-12] Blob Versioning (#190)

* Generated code for 12-12-2019 spec

* Fix test

* Changes

* Basic Testing and modification in WithVersionId function.

* Added Tags and Versions in BlobListingDetails.

* Added Tests

* Added TestCases

* Commented out tests which require versioning disabled.

* Added Tests

* Testcases 1-on-1 with python SDK

* Moved all tests to same file for ease of accessibility

Co-authored-by: zezha-msft <zezha@microsoft.com>

* update to go1.14

* Minor Jumbo Blob Fix and Blob Versioning fix (#198)

* Minor Jumbo Blob fix + versioning fix

* Test Case Fix

* Renamed struct back to original

* Changed block blob limit (#199)

* Minor versioning fix (#200)

* [Go][Blob][2019-02-02] Set tier support on copy/put blob API (#203)

* Added tier parameter in upload block blob function signature + Fixed usage + Wrote a test case for validation.

* Added tier parameter in
a. CopyFromURL, CommitBlockList of Block Blob
b. Create (Page Blob)
Fixed all occurrence

* Minor Change

* Added test

* Rev go to 1.15, adal to 0.9.2 (#205)

Update go to latest version
Update adal dependency

* #7508079 [Go][Blob][2019-12-12] Blob Versioning (#190)

* Generated code for 12-12-2019 spec

* Fix test

* Changes

* Basic Testing and modification in WithVersionId function.

* Added Tags and Versions in BlobListingDetails.

* Added Tests

* Added TestCases

* Commented out tests which require versioning disabled.

* Added Tests

* Testcases 1-on-1 with python SDK

* Moved all tests to same file for ease of accessibility

Co-authored-by: zezha-msft <zezha@microsoft.com>

* Minor Jumbo Blob Fix and Blob Versioning fix (#198)

* Minor Jumbo Blob fix + versioning fix

* Test Case Fix

* Renamed struct back to original

* Changed block blob limit (#199)

* update to go1.14

* Minor versioning fix (#200)

* [Go][Blob][2019-02-02] Set tier support on copy/put blob API (#203)

* Added tier parameter in upload block blob function signature + Fixed usage + Wrote a test case for validation.

* Added tier parameter in
a. CopyFromURL, CommitBlockList of Block Blob
b. Create (Page Blob)
Fixed all occurrence

* Minor Change

* Added test

* Rev go to 1.15, adal to 0.9.2 (#205)

Update go to latest version
Update adal dependency

* Fixing BlockBlobMaxUploadBlobBytes  value (#207)

Reverting BlockBlobMaxUploadBlobBytes to 256MB

* Consider 502 as a temporary error (#204)

* [highlevel] Stop using memory-mapped files

While investigating this SDK for uploading and downloading large blobs
(e.g. 25GB or more) it became apparent that the memory-mapped approach
has some severe limitations:

1. Limits the file size on 32-bit systems (theoretically 4GB, but much
   less in practice).
2. Has no backpressure when writing to slower storage mediums.
3. Appears to finish faster, but the OS spends several minutes flushing
   the modified RAM to disk afterwards (depends on the speed of the
   disk).

On a VM with 16GB of RAM and a slow disk (spinning in this case) the
algorithm quickly overwhelms the available memory and causes severe
performance degradation. It ended up simultaneously trying to flush to
the slow data disk and page out to the slightly faster OS disk.

The solution is to stop using memory-mapped files (at least the way the
SDK currently uses then) and switch to the `io.ReaderAt` and
`io.WriterAt` interfaces. They explicitly allow for parallel access to
non-overlapping regions which make them a good candidate for this
purpose.

Benchmarking large downloads (25GB file) between azcopy 10.4.3 and these
updates using a test app, the difference between them is within 10
seconds. When compared against the original code on a beefy machine with
plenty of RAM the measured execution time is faster, but there is a
little bit of delay while the last of the data flushes from RAM to disk.

* PR feedback

Co-authored-by: Till Wegmueller <toasterson@gmail.com>
Co-authored-by: Ze Qian Zhang <zezha@microsoft.com>
Co-authored-by: Mohit Sharma <65536214+mohsha-msft@users.noreply.github.com>
Co-authored-by: Jonas-Taha El Sesiy <github@elsesiy.com>
Co-authored-by: mohsha-msft <mohsha@microsoft.com>
Co-authored-by: Kyle Farnung <kfarnung@outlook.com>
  • Loading branch information
7 people authored Oct 27, 2020
1 parent 510f9f0 commit 4e8f2d4
Show file tree
Hide file tree
Showing 44 changed files with 5,590 additions and 841 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
language: go
go:
- "1.13"
- "1.15"
script:
- export GO111MODULE=on
- GOOS=linux go build ./azblob
- GOOS=darwin go build ./azblob
- GOOS=windows go build ./azblob
- GOOS=solaris go build ./azblob
- GOOS=illumos go build ./azblob
- go test -race -short -cover -v ./azblob
24 changes: 24 additions & 0 deletions azblob/bytes_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package azblob

import (
"errors"
)

type bytesWriter []byte

func newBytesWriter(b []byte) bytesWriter {
return b
}

func (c bytesWriter) WriteAt(b []byte, off int64) (int, error) {
if off >= int64(len(c)) || off < 0 {
return 0, errors.New("Offset value is out of range")
}

n := copy(c[int(off):], b)
if n < len(b) {
return n, errors.New("Not enough space for all bytes")
}

return n, nil
}
30 changes: 30 additions & 0 deletions azblob/bytes_writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package azblob

import (
"bytes"

chk "gopkg.in/check.v1"
)

func (s *aztestsSuite) TestBytesWriterWriteAt(c *chk.C) {
b := make([]byte, 10)
buffer := newBytesWriter(b)

count, err := buffer.WriteAt([]byte{1, 2}, 10)
c.Assert(err, chk.ErrorMatches, "Offset value is out of range")
c.Assert(count, chk.Equals, 0)

count, err = buffer.WriteAt([]byte{1, 2}, -1)
c.Assert(err, chk.ErrorMatches, "Offset value is out of range")
c.Assert(count, chk.Equals, 0)

count, err = buffer.WriteAt([]byte{1, 2}, 9)
c.Assert(err, chk.ErrorMatches, "Not enough space for all bytes")
c.Assert(count, chk.Equals, 1)
c.Assert(bytes.Compare(b, []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 1}), chk.Equals, 0)

count, err = buffer.WriteAt([]byte{1, 2}, 8)
c.Assert(err, chk.IsNil)
c.Assert(count, chk.Equals, 2)
c.Assert(bytes.Compare(b, []byte{0, 0, 0, 0, 0, 0, 0, 0, 1, 2}), chk.Equals, 0)
}
4 changes: 2 additions & 2 deletions azblob/chunkwriting.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
// This allows us to provide a local implementation that fakes the server for hermetic testing.
type blockWriter interface {
StageBlock(context.Context, string, io.ReadSeeker, LeaseAccessConditions, []byte) (*BlockBlobStageBlockResponse, error)
CommitBlockList(context.Context, []string, BlobHTTPHeaders, Metadata, BlobAccessConditions) (*BlockBlobCommitBlockListResponse, error)
CommitBlockList(context.Context, []string, BlobHTTPHeaders, Metadata, BlobAccessConditions, AccessTierType) (*BlockBlobCommitBlockListResponse, error)
}

// copyFromReader copies a source io.Reader to blob storage using concurrent uploads.
Expand Down Expand Up @@ -201,7 +201,7 @@ func (c *copier) close() error {
}

var err error
c.result, err = c.to.CommitBlockList(c.ctx, c.id.issued(), c.o.BlobHTTPHeaders, c.o.Metadata, c.o.AccessConditions)
c.result, err = c.to.CommitBlockList(c.ctx, c.id.issued(), c.o.BlobHTTPHeaders, c.o.Metadata, c.o.AccessConditions, c.o.BlobAccessTier)
return err
}

Expand Down
2 changes: 1 addition & 1 deletion azblob/chunkwriting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (f *fakeBlockWriter) StageBlock(ctx context.Context, blockID string, r io.R
return &BlockBlobStageBlockResponse{}, nil
}

func (f *fakeBlockWriter) CommitBlockList(ctx context.Context, blockIDs []string, headers BlobHTTPHeaders, meta Metadata, access BlobAccessConditions) (*BlockBlobCommitBlockListResponse, error) {
func (f *fakeBlockWriter) CommitBlockList(ctx context.Context, blockIDs []string, headers BlobHTTPHeaders, meta Metadata, access BlobAccessConditions, tier AccessTierType) (*BlockBlobCommitBlockListResponse, error) {
dst, err := os.OpenFile(filepath.Join(f.path, finalFileName), os.O_CREATE+os.O_WRONLY, 0600)
if err != nil {
return nil, err
Expand Down
72 changes: 36 additions & 36 deletions azblob/highlevel.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,56 +55,58 @@ type UploadToBlockBlobOptions struct {
// AccessConditions indicates the access conditions for the block blob.
AccessConditions BlobAccessConditions

// BlobAccessTier indicates the tier of blob
BlobAccessTier AccessTierType

// Parallelism indicates the maximum number of blocks to upload in parallel (0=default)
Parallelism uint16
}

// UploadBufferToBlockBlob uploads a buffer in blocks to a block blob.
func UploadBufferToBlockBlob(ctx context.Context, b []byte,
// uploadReaderAtToBlockBlob uploads a buffer in blocks to a block blob.
func uploadReaderAtToBlockBlob(ctx context.Context, reader io.ReaderAt, readerSize int64,
blockBlobURL BlockBlobURL, o UploadToBlockBlobOptions) (CommonResponse, error) {
bufferSize := int64(len(b))
if o.BlockSize == 0 {
// If bufferSize > (BlockBlobMaxStageBlockBytes * BlockBlobMaxBlocks), then error
if bufferSize > BlockBlobMaxStageBlockBytes*BlockBlobMaxBlocks {
if readerSize > BlockBlobMaxStageBlockBytes*BlockBlobMaxBlocks {
return nil, errors.New("buffer is too large to upload to a block blob")
}
// If bufferSize <= BlockBlobMaxUploadBlobBytes, then Upload should be used with just 1 I/O request
if bufferSize <= BlockBlobMaxUploadBlobBytes {
if readerSize <= BlockBlobMaxUploadBlobBytes {
o.BlockSize = BlockBlobMaxUploadBlobBytes // Default if unspecified
} else {
o.BlockSize = bufferSize / BlockBlobMaxBlocks // buffer / max blocks = block size to use all 50,000 blocks
o.BlockSize = readerSize / BlockBlobMaxBlocks // buffer / max blocks = block size to use all 50,000 blocks
if o.BlockSize < BlobDefaultDownloadBlockSize { // If the block size is smaller than 4MB, round up to 4MB
o.BlockSize = BlobDefaultDownloadBlockSize
}
// StageBlock will be called with blockSize blocks and a Parallelism of (BufferSize / BlockSize).
}
}

if bufferSize <= BlockBlobMaxUploadBlobBytes {
if readerSize <= BlockBlobMaxUploadBlobBytes {
// If the size can fit in 1 Upload call, do it this way
var body io.ReadSeeker = bytes.NewReader(b)
var body io.ReadSeeker = io.NewSectionReader(reader, 0, readerSize)
if o.Progress != nil {
body = pipeline.NewRequestBodyProgress(body, o.Progress)
}
return blockBlobURL.Upload(ctx, body, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions)
return blockBlobURL.Upload(ctx, body, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions, o.BlobAccessTier)
}

var numBlocks = uint16(((bufferSize - 1) / o.BlockSize) + 1)
var numBlocks = uint16(((readerSize - 1) / o.BlockSize) + 1)

blockIDList := make([]string, numBlocks) // Base-64 encoded block IDs
progress := int64(0)
progressLock := &sync.Mutex{}

err := DoBatchTransfer(ctx, BatchTransferOptions{
OperationName: "UploadBufferToBlockBlob",
TransferSize: bufferSize,
OperationName: "uploadReaderAtToBlockBlob",
TransferSize: readerSize,
ChunkSize: o.BlockSize,
Parallelism: o.Parallelism,
Operation: func(offset int64, count int64, ctx context.Context) error {
// This function is called once per block.
// It is passed this block's offset within the buffer and its count of bytes
// Prepare to read the proper block/section of the buffer
var body io.ReadSeeker = bytes.NewReader(b[offset : offset+count])
var body io.ReadSeeker = io.NewSectionReader(reader, offset, count)
blockNum := offset / o.BlockSize
if o.Progress != nil {
blockProgress := int64(0)
Expand All @@ -130,7 +132,13 @@ func UploadBufferToBlockBlob(ctx context.Context, b []byte,
return nil, err
}
// All put blocks were successful, call Put Block List to finalize the blob
return blockBlobURL.CommitBlockList(ctx, blockIDList, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions)
return blockBlobURL.CommitBlockList(ctx, blockIDList, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions, o.BlobAccessTier)
}

// UploadBufferToBlockBlob uploads a buffer in blocks to a block blob.
func UploadBufferToBlockBlob(ctx context.Context, b []byte,
blockBlobURL BlockBlobURL, o UploadToBlockBlobOptions) (CommonResponse, error) {
return uploadReaderAtToBlockBlob(ctx, bytes.NewReader(b), int64(len(b)), blockBlobURL, o)
}

// UploadFileToBlockBlob uploads a file in blocks to a block blob.
Expand All @@ -141,15 +149,7 @@ func UploadFileToBlockBlob(ctx context.Context, file *os.File,
if err != nil {
return nil, err
}
m := mmf{} // Default to an empty slice; used for 0-size file
if stat.Size() != 0 {
m, err = newMMF(file, false, 0, int(stat.Size()))
if err != nil {
return nil, err
}
defer m.unmap()
}
return UploadBufferToBlockBlob(ctx, m, blockBlobURL, o)
return uploadReaderAtToBlockBlob(ctx, file, stat.Size(), blockBlobURL, o)
}

///////////////////////////////////////////////////////////////////////////////
Expand All @@ -174,9 +174,9 @@ type DownloadFromBlobOptions struct {
RetryReaderOptionsPerBlock RetryReaderOptions
}

// downloadBlobToBuffer downloads an Azure blob to a buffer with parallel.
func downloadBlobToBuffer(ctx context.Context, blobURL BlobURL, offset int64, count int64,
b []byte, o DownloadFromBlobOptions, initialDownloadResponse *DownloadResponse) error {
// downloadBlobToWriterAt downloads an Azure blob to a buffer with parallel.
func downloadBlobToWriterAt(ctx context.Context, blobURL BlobURL, offset int64, count int64,
writer io.WriterAt, o DownloadFromBlobOptions, initialDownloadResponse *DownloadResponse) error {
if o.BlockSize == 0 {
o.BlockSize = BlobDefaultDownloadBlockSize
}
Expand All @@ -194,12 +194,17 @@ func downloadBlobToBuffer(ctx context.Context, blobURL BlobURL, offset int64, co
}
}

if count <= 0 {
// The file is empty, there is nothing to download.
return nil
}

// Prepare and do parallel download.
progress := int64(0)
progressLock := &sync.Mutex{}

err := DoBatchTransfer(ctx, BatchTransferOptions{
OperationName: "downloadBlobToBuffer",
OperationName: "downloadBlobToWriterAt",
TransferSize: count,
ChunkSize: o.BlockSize,
Parallelism: o.Parallelism,
Expand All @@ -222,7 +227,7 @@ func downloadBlobToBuffer(ctx context.Context, blobURL BlobURL, offset int64, co
progressLock.Unlock()
})
}
_, err = io.ReadFull(body, b[chunkStart:chunkStart+count])
_, err = io.Copy(newSectionWriter(writer, chunkStart, count), body)
body.Close()
return err
},
Expand All @@ -237,7 +242,7 @@ func downloadBlobToBuffer(ctx context.Context, blobURL BlobURL, offset int64, co
// Offset and count are optional, pass 0 for both to download the entire blob.
func DownloadBlobToBuffer(ctx context.Context, blobURL BlobURL, offset int64, count int64,
b []byte, o DownloadFromBlobOptions) error {
return downloadBlobToBuffer(ctx, blobURL, offset, count, b, o, nil)
return downloadBlobToWriterAt(ctx, blobURL, offset, count, newBytesWriter(b), o, nil)
}

// DownloadBlobToFile downloads an Azure blob to a local file.
Expand Down Expand Up @@ -271,13 +276,7 @@ func DownloadBlobToFile(ctx context.Context, blobURL BlobURL, offset int64, coun
}

if size > 0 {
// 3. Set mmap and call downloadBlobToBuffer.
m, err := newMMF(file, true, 0, int(size))
if err != nil {
return err
}
defer m.unmap()
return downloadBlobToBuffer(ctx, blobURL, offset, size, m, o, nil)
return downloadBlobToWriterAt(ctx, blobURL, offset, size, file, o, nil)
} else { // if the blob's size is 0, there is no need in downloading it
return nil
}
Expand Down Expand Up @@ -363,6 +362,7 @@ type UploadStreamToBlockBlobOptions struct {
BlobHTTPHeaders BlobHTTPHeaders
Metadata Metadata
AccessConditions BlobAccessConditions
BlobAccessTier AccessTierType
}

func (u *UploadStreamToBlockBlobOptions) defaults() {
Expand Down
27 changes: 26 additions & 1 deletion azblob/parsing_urls.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package azblob

import (
"errors"
"net"
"net/url"
"strings"
)

const (
snapshot = "snapshot"
versionId = "versionid"
SnapshotTimeFormat = "2006-01-02T15:04:05.0000000Z07:00"
)

Expand All @@ -23,6 +25,7 @@ type BlobURLParts struct {
Snapshot string // "" if not a snapshot
SAS SASQueryParameters
UnparsedParams string
VersionID string // "" if not versioning enabled
}

// IPEndpointStyleInfo is used for IP endpoint style URL when working with Azure storage emulator.
Expand Down Expand Up @@ -85,12 +88,20 @@ func NewBlobURLParts(u url.URL) BlobURLParts {
// Convert the query parameters to a case-sensitive map & trim whitespace
paramsMap := u.Query()

up.Snapshot = "" // Assume no snapshot
up.Snapshot = "" // Assume no snapshot
up.VersionID = "" // Assume no versionID
if snapshotStr, ok := caseInsensitiveValues(paramsMap).Get(snapshot); ok {
up.Snapshot = snapshotStr[0]
// If we recognized the query parameter, remove it from the map
delete(paramsMap, snapshot)
}

if versionIDs, ok := caseInsensitiveValues(paramsMap).Get(versionId); ok {
up.VersionID = versionIDs[0]
// If we recognized the query parameter, remove it from the map
delete(paramsMap, versionId) // delete "versionid" from paramsMap
delete(paramsMap, "versionId") // delete "versionId" from paramsMap
}
up.SAS = newSASQueryParameters(paramsMap, true)
up.UnparsedParams = paramsMap.Encode()
return up
Expand Down Expand Up @@ -124,6 +135,11 @@ func (up BlobURLParts) URL() url.URL {

rawQuery := up.UnparsedParams

// Check: Both snapshot and version id cannot be present in the request URL.
if up.Snapshot != "" && up.VersionID != "" {
errors.New("Snapshot and versioning cannot be enabled simultaneously")
}

//If no snapshot is initially provided, fill it in from the SAS query properties to help the user
if up.Snapshot == "" && !up.SAS.snapshotTime.IsZero() {
up.Snapshot = up.SAS.snapshotTime.Format(SnapshotTimeFormat)
Expand All @@ -136,6 +152,15 @@ func (up BlobURLParts) URL() url.URL {
}
rawQuery += snapshot + "=" + up.Snapshot
}

// Concatenate blob version id query parameter (if it exists)
if up.VersionID != "" {
if len(rawQuery) > 0 {
rawQuery += "&"
}
rawQuery += versionId + "=" + up.VersionID
}

sas := up.SAS.Encode()
if sas != "" {
if len(rawQuery) > 0 {
Expand Down
15 changes: 14 additions & 1 deletion azblob/sas_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ func (v BlobSASSignatureValues) NewSASQueryParameters(credential StorageAccountC
return SASQueryParameters{}, err
}
v.Permissions = perms.String()
} else if v.Version != "" {
resource = "bv"
//Make sure the permission characters are in the correct order
perms := &BlobSASPermissions{}
if err := perms.Parse(v.Permissions); err != nil {
return SASQueryParameters{}, err
}
v.Permissions = perms.String()
} else if v.BlobName == "" {
// Make sure the permission characters are in the correct order
perms := &ContainerSASPermissions{}
Expand Down Expand Up @@ -209,7 +217,7 @@ func (p *ContainerSASPermissions) Parse(s string) error {

// The BlobSASPermissions type simplifies creating the permissions string for an Azure Storage blob SAS.
// Initialize an instance of this type and then call its String method to set BlobSASSignatureValues's Permissions field.
type BlobSASPermissions struct{ Read, Add, Create, Write, Delete bool }
type BlobSASPermissions struct{ Read, Add, Create, Write, Delete, DeletePreviousVersion bool }

// String produces the SAS permissions string for an Azure Storage blob.
// Call this method to set BlobSASSignatureValues's Permissions field.
Expand All @@ -230,6 +238,9 @@ func (p BlobSASPermissions) String() string {
if p.Delete {
b.WriteRune('d')
}
if p.DeletePreviousVersion {
b.WriteRune('x')
}
return b.String()
}

Expand All @@ -248,6 +259,8 @@ func (p *BlobSASPermissions) Parse(s string) error {
p.Write = true
case 'd':
p.Delete = true
case 'x':
p.DeletePreviousVersion = true
default:
return fmt.Errorf("Invalid permission: '%v'", r)
}
Expand Down
Loading

0 comments on commit 4e8f2d4

Please sign in to comment.