Skip to content

Commit

Permalink
Merge pull request #3 from JeffreyRichter/master
Browse files Browse the repository at this point in the history
Improved logging
  • Loading branch information
JeffreyRichter authored Nov 13, 2017
2 parents fa5a548 + 8849533 commit 9149890
Show file tree
Hide file tree
Showing 14 changed files with 661 additions and 375 deletions.
38 changes: 18 additions & 20 deletions 2016-05-31/azblob/credential_shared_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,8 @@ func (f *SharedKeyCredential) buildStringToSign(request pipeline.Request) string
headers.Get(headerIfUnmodifiedSince),
headers.Get(headerRange),
buildCanonicalizedHeader(headers),
f.buildCanonicalizedResource(request),
f.buildCanonicalizedResource(request.URL),
}, "\n")

return stringToSign
}

Expand Down Expand Up @@ -156,43 +155,42 @@ func buildCanonicalizedHeader(headers http.Header) string {
return string(ch.Bytes())
}

func (f *SharedKeyCredential) buildCanonicalizedResource(request pipeline.Request) string {
func (f *SharedKeyCredential) buildCanonicalizedResource(u *url.URL) string {
// https://docs.microsoft.com/en-us/rest/api/storageservices/authentication-for-the-azure-storage-services
cr := bytes.NewBufferString("/")
cr.WriteString(f.accountName)

if len(request.URL.Path) > 0 {
if len(u.Path) > 0 {
// Any portion of the CanonicalizedResource string that is derived from
// the resource's URI should be encoded exactly as it is in the URI.
// -- https://msdn.microsoft.com/en-gb/library/azure/dd179428.aspx
cr.WriteString(request.URL.EscapedPath())
cr.WriteString(u.EscapedPath())
} else {
// a slash is required to indicate the root path
cr.WriteString("/")
}

params, err := url.ParseQuery(request.URL.RawQuery)
// params is a map[string][]string; param name is key; params values is []string
params, err := url.ParseQuery(u.RawQuery) // Returns URL decoded values
if err != nil {
panic(err)
}

if len(params) > 0 {
cr.WriteRune('\n')

keys := []string{}
for key := range params {
keys = append(keys, key)
if len(params) > 0 { // There is at least 1 query parameter
paramNames := []string{} // We use this to sort the parameter key names
for paramName := range params {
paramNames = append(paramNames, paramName) // paramNames must be lowercase
}
sort.Strings(keys)
sort.Strings(paramNames)

completeParams := []string{}
for _, key := range keys {
if len(params[key]) > 1 {
sort.Strings(params[key])
}
for _, paramName := range paramNames {
paramValues := params[paramName]
sort.Strings(paramValues)

completeParams = append(completeParams, strings.Join([]string{key, ":", strings.Join(params[key], ",")}, ""))
// Join the sorted key values separated by ','
// Then prepend "keyName:"; then add this string to the buffer
cr.WriteString("\n" + paramName + ":" + strings.Join(paramValues, ","))
}
cr.WriteString(strings.Join(completeParams, "\n"))
}
return string(cr.Bytes())
}
82 changes: 41 additions & 41 deletions 2016-05-31/azblob/fitness.go → 2016-05-31/azblob/highlevel.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package azblob
import (
"context"
"encoding/base64"
"encoding/binary"
"fmt"
"io"
"net"
Expand All @@ -12,9 +11,9 @@ import (
"github.com/Azure/azure-pipeline-go/pipeline"
)

// StreamToBlockBlobOptions identifies options used by the StreamToBlockBlob function. Note that the
// UploadStreamToBlockBlobOptions identifies options used by the UploadStreamToBlockBlob function. Note that the
// BlockSize field is mandatory and must be set; other fields are optional.
type StreamToBlockBlobOptions struct {
type UploadStreamToBlockBlobOptions struct {
// BlockSize is mandatory. It specifies the block size to use; the maximum size is BlockBlobMaxPutBlockBytes.
BlockSize int64

Expand All @@ -26,18 +25,23 @@ type StreamToBlockBlobOptions struct {

// Metadata indicates the metadata to be associated with the blob when PutBlockList is called.
Metadata Metadata
// BlobAccessConditions???

// AccessConditions indicates the access conditions for the block blob.
AccessConditions BlobAccessConditions
}

// StreamToBlockBlob uploads a large stream of data in blocks to a block blob.
func StreamToBlockBlob(ctx context.Context, stream io.ReaderAt, streamSize int64,
blockBlobURL BlockBlobURL, o StreamToBlockBlobOptions) (*BlockBlobsPutBlockListResponse, error) {
// UploadStreamToBlockBlob uploads a stream of data in blocks to a block blob.
func UploadStreamToBlockBlob(ctx context.Context, stream io.ReaderAt, streamSize int64,
blockBlobURL BlockBlobURL, o UploadStreamToBlockBlobOptions) (*BlockBlobsPutBlockListResponse, error) {

if o.BlockSize <= 0 || o.BlockSize > BlockBlobMaxPutBlockBytes {
panic(fmt.Sprintf("BlockSize option must be > 0 and <= %d", BlockBlobMaxPutBlockBytes))
}

numBlocks := ((streamSize - int64(1)) / o.BlockSize) + 1
if numBlocks > BlockBlobMaxBlocks {
panic(fmt.Sprintf("The streamSize is too big or the BlockSize is too small; the number of blocks must be <= %d", BlockBlobMaxBlocks))
}
blockIDList := make([]string, numBlocks) // Base 64 encoded block IDs
blockSize := o.BlockSize

Expand All @@ -54,50 +58,47 @@ func StreamToBlockBlob(ctx context.Context, stream io.ReaderAt, streamSize int64
func(bytesTransferred int64) { o.Progress(streamOffset + bytesTransferred) })
}

blockIDList[blockNum] = blockIDUint64ToBase64(uint64(streamOffset)) // The streamOffset is the block ID
_, err := blockBlobURL.PutBlock(ctx, blockIDList[blockNum], body, LeaseAccessConditions{})
// Block IDs are unique values to avoid issue if 2+ clients are uploading blocks
// at the same time causeing PutBlockList to get a mix of blocks from all the clients.
blockIDList[blockNum] = base64.StdEncoding.EncodeToString(newUUID().bytes())
_, err := blockBlobURL.PutBlock(ctx, blockIDList[blockNum], body, o.AccessConditions.LeaseAccessConditions)
if err != nil {
return nil, err
}
}
return blockBlobURL.PutBlockList(ctx, blockIDList, o.Metadata, o.BlobHTTPHeaders, BlobAccessConditions{})
return blockBlobURL.PutBlockList(ctx, blockIDList, o.Metadata, o.BlobHTTPHeaders, o.AccessConditions)
}

// NOTE: The blockID must be <= 64 bytes and ALL blockIDs for the block must be the same length
// These helper functions convert an int64 block ID to a base-64 string
func blockIDUint64ToBase64(blockID uint64) string {
binaryBlockID := [64 / 8]byte{} // All block IDs are 8 bytes long
binary.LittleEndian.PutUint64(binaryBlockID[:], blockID)
return base64.StdEncoding.EncodeToString(binaryBlockID[:])
}

// GetRetryStreamOptions is used to configure a call to NewGetTryStream to download a large stream with intelligent retries.
type GetRetryStreamOptions struct {
// DownloadStreamOptions is used to configure a call to NewDownloadBlobToStream to download a large stream with intelligent retries.
type DownloadStreamOptions struct {
// Range indicates the starting offset and count of bytes within the blob to download.
Range BlobRange

// Acc indicates the BlobAccessConditions to use when accessing the blob.
AC BlobAccessConditions

// GetBlobResult identifies a function to invoke immediately after GetRetryStream's Read method internally
// calls GetBlob. This function is invoked after every call to GetBlob. The callback can example GetBlob's
// response and error information.
GetBlobResult func(*GetResponse, error)
// AccessConditions indicates the BlobAccessConditions to use when accessing the blob.
AccessConditions BlobAccessConditions
}

type retryStream struct {
ctx context.Context
blobURL BlobURL
o GetRetryStreamOptions
getBlob func(ctx context.Context, blobRange BlobRange, ac BlobAccessConditions, rangeGetContentMD5 bool) (*GetResponse, error)
o DownloadStreamOptions
response *http.Response
}

// NewGetRetryStream creates a stream over a blob allowing you download the blob's contents.
// NewDownloadStream creates a stream over a blob allowing you download the blob's contents.
// When network errors occur, the retry stream internally issues new HTTP GET requests for
// the remaining range of the blob's contents.
func NewGetRetryStream(ctx context.Context, blobURL BlobURL, o GetRetryStreamOptions) io.ReadCloser {
// the remaining range of the blob's contents. The GetBlob argument identifies the function
// to invoke when the GetRetryStream needs to make an HTTP GET request as Read methods are called.
// The callback can wrap the response body (with progress reporting, for example) before returning.
func NewDownloadStream(ctx context.Context,
getBlob func(ctx context.Context, blobRange BlobRange, ac BlobAccessConditions, rangeGetContentMD5 bool) (*GetResponse, error),
o DownloadStreamOptions) io.ReadCloser {

// BlobAccessConditions may already have an If-Match:etag header
return &retryStream{ctx: ctx, blobURL: blobURL, o: o, response: nil}
if getBlob == nil {
panic("getBlob must not be nil")
}
return &retryStream{ctx: ctx, getBlob: getBlob, o: o, response: nil}
}

func (s *retryStream) Read(p []byte) (n int, err error) {
Expand All @@ -111,6 +112,7 @@ func (s *retryStream) Read(p []byte) (n int, err error) {
}
return n, err // Return the return to the caller
}
s.Close()
s.response = nil // Something went wrong; our stream is no longer good
if nerr, ok := err.(net.Error); ok {
if !nerr.Timeout() && !nerr.Temporary() {
Expand All @@ -122,25 +124,23 @@ func (s *retryStream) Read(p []byte) (n int, err error) {
}

// We don't have a response stream to read from, try to get one
response, err := s.blobURL.GetBlob(s.ctx, s.o.Range, s.o.AC, false)
if s.o.GetBlobResult != nil {
// If caller desires notification of each GetBlob call, notify them
s.o.GetBlobResult(response, err)
}
response, err := s.getBlob(s.ctx, s.o.Range, s.o.AccessConditions, false)
if err != nil {
return 0, err
}
// Successful GET; this is the network stream we'll read from
s.response = response.Response()

// Ensure that future requests are from the same version of the source
s.o.AC.IfMatch = response.ETag()
s.o.AccessConditions.IfMatch = response.ETag()

// Loop around and try to read from this stream
}
}

func (s *retryStream) Close() error {
//s.blobURL = BlobURL{} // This blobURL is no longer valid
return s.response.Body.Close()
if s.response != nil && s.response.Body != nil {
return s.response.Body.Close()
}
return nil
}
15 changes: 13 additions & 2 deletions 2016-05-31/azblob/parsing_urls.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ func NewBlobURLParts(u url.URL) BlobURLParts {
}
}

// Convert the query parameters to a case-sensitive map & trim whitsapce
// Convert the query parameters to a case-sensitive map & trim whitespace
paramsMap := u.Query()

up.Snapshot = time.Time{} // Assume no snapshot
if snapshotStr, ok := paramsMap["snapshot"]; ok {
if snapshotStr, ok := caseInsensitiveValues(paramsMap).Get("snapshot"); ok {
up.Snapshot, _ = time.Parse(snapshotTimeFormat, snapshotStr[0])
// If we recognized the query parameter, remove it from the map
delete(paramsMap, "snapshot")
Expand All @@ -62,6 +62,17 @@ func NewBlobURLParts(u url.URL) BlobURLParts {
return up
}

type caseInsensitiveValues url.Values // map[string][]string
func (v caseInsensitiveValues) Get(key string) ([]string, bool) {
key = strings.ToLower(key)
for key, value := range v {
if strings.ToLower(key) == key {
return value, true
}
}
return []string{}, false
}

// URL returns a URL object whose fields are initialized from the BlobURLParts fields. The URL's RawQuery
// field contains the SAS, snapshot, and unparsed query parameters.
func (up BlobURLParts) URL() url.URL {
Expand Down
Loading

0 comments on commit 9149890

Please sign in to comment.