Skip to content

Commit

Permalink
[Azcore] simplify progress objects (Azure#15194)
Browse files Browse the repository at this point in the history
* started combining

* single progress for request and response

* adding docstrings

* modify build-test to find silent error

* undoing eng change

* removing race and covermode from the test command
  • Loading branch information
seankane-msft authored and vindicatesociety committed Sep 18, 2021
1 parent 61a3020 commit 27a1115
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 53 deletions.
92 changes: 41 additions & 51 deletions sdk/azcore/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,74 +5,64 @@

package azcore

import "io"

// ********** The following is common between the request body AND the response body.
import (
"io"
)

// ProgressReceiver defines the signature of a callback function invoked as progress is reported.
// Note that bytesTransferred resets to 0 if the stream is reset when retrying a network operation.
type ProgressReceiver func(bytesTransferred int64)

// ********** The following are specific to the request body (a ReadSeekCloser)
type progress struct {
rc io.ReadCloser
rsc ReadSeekCloser
pr ProgressReceiver
offset int64
}

// This struct is used when sending a body to the network
type requestBodyProgress struct {
requestBody ReadSeekCloser // Seeking is required to support retries
pr ProgressReceiver
// NewRequestProgress adds progress reporting to an HTTP request's body stream.
func NewRequestProgress(body ReadSeekCloser, pr ProgressReceiver) ReadSeekCloser {
return &progress{
rc: body,
rsc: body,
pr: pr,
offset: 0,
}
}

// NewRequestBodyProgress adds progress reporting to an HTTP request's body stream.
func NewRequestBodyProgress(requestBody ReadSeekCloser, pr ProgressReceiver) ReadSeekCloser {
return &requestBodyProgress{requestBody: requestBody, pr: pr}
// NewResponseProgress adds progress reporting to an HTTP response's body stream.
func NewResponseProgress(body io.ReadCloser, pr ProgressReceiver) io.ReadCloser {
return &progress{
rc: body,
rsc: nil,
pr: pr,
offset: 0,
}
}

// Read reads a block of data from an inner stream and reports progress
func (rbp *requestBodyProgress) Read(p []byte) (n int, err error) {
n, err = rbp.requestBody.Read(p)
if err != nil {
func (p *progress) Read(b []byte) (n int, err error) {
n, err = p.rc.Read(b)
if err != nil && err != io.EOF {
return
}
p.offset += int64(n)
// Invokes the user's callback method to report progress
position, err := rbp.requestBody.Seek(0, io.SeekCurrent)
if err != nil {
return
}
rbp.pr(position)
p.pr(p.offset)
return
}

func (rbp *requestBodyProgress) Seek(offset int64, whence int) (offsetFromStart int64, err error) {
return rbp.requestBody.Seek(offset, whence)
// Seek only expects a zero or from beginning.
func (p *progress) Seek(offset int64, whence int) (int64, error) {
// This should only ever be called with offset = 0 and whence = io.SeekStart
n, err := p.rsc.Seek(offset, whence)
if err == nil {
p.offset = int64(n)
}
return n, err
}

// requestBodyProgress supports Close but the underlying stream may not; if it does, Close will close it.
func (rbp *requestBodyProgress) Close() error {
return rbp.requestBody.Close()
}

// ********** The following are specific to the response body (a ReadCloser)

// This struct is used when sending a body to the network
type responseBodyProgress struct {
responseBody io.ReadCloser
pr ProgressReceiver
offset int64
}

// NewResponseBodyProgress adds progress reporting to an HTTP response's body stream.
func NewResponseBodyProgress(responseBody io.ReadCloser, pr ProgressReceiver) io.ReadCloser {
return &responseBodyProgress{responseBody: responseBody, pr: pr, offset: 0}
}

// Read reads a block of data from an inner stream and reports progress
func (rbp *responseBodyProgress) Read(p []byte) (n int, err error) {
n, err = rbp.responseBody.Read(p)
rbp.offset += int64(n)

// Invokes the user's callback method to report progress
rbp.pr(rbp.offset)
return
}

func (rbp *responseBodyProgress) Close() error {
return rbp.responseBody.Close()
func (p *progress) Close() error {
return p.rc.Close()
}
51 changes: 49 additions & 2 deletions sdk/azcore/progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package azcore
import (
"bytes"
"context"
"io"
"io/ioutil"
"net/http"
"reflect"
Expand All @@ -33,7 +34,7 @@ func TestProgressReporting(t *testing.T) {
}
req.SkipBodyDownload()
var bytesSent int64
reqRpt := NewRequestBodyProgress(NopCloser(body), func(bytesTransferred int64) {
reqRpt := NewRequestProgress(NopCloser(body), func(bytesTransferred int64) {
bytesSent = bytesTransferred
})
if err := req.SetBody(reqRpt, "application/octet-stream"); err != nil {
Expand All @@ -44,7 +45,7 @@ func TestProgressReporting(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
var bytesReceived int64
respRpt := NewResponseBodyProgress(resp.Body, func(bytesTransferred int64) {
respRpt := NewResponseProgress(resp.Body, func(bytesTransferred int64) {
bytesReceived = bytesTransferred
})
defer respRpt.Close()
Expand All @@ -62,3 +63,49 @@ func TestProgressReporting(t *testing.T) {
t.Fatal("request and response bodies don't match")
}
}

// Ensure there is a seek to 0
// do some reading, call a seek, then make sure reads are from the beginning
func TestProgressReportingSeek(t *testing.T) {
const contentSize = 4096
content := make([]byte, contentSize)
for i := 0; i < contentSize; i++ {
content[i] = byte(i % 255)
}
body := bytes.NewReader(content)
srv, close := mock.NewServer()
defer close()
srv.SetResponse(mock.WithBody(content))
pl := NewPipeline(srv)
req, err := NewRequest(context.Background(), http.MethodGet, srv.URL())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
req.SkipBodyDownload()
var bytesSent int64
reqRpt := NewRequestProgress(NopCloser(body), func(bytesTransferred int64) {
bytesSent = bytesTransferred
})
if err := req.SetBody(reqRpt, "application/octet-stream"); err != nil {
t.Fatal(err)
}
_, err = pl.Do(req)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if bytesSent == 0 {
t.Fatalf("bytesSent unexpectedly 0")
}

_, err = reqRpt.Seek(0, io.SeekStart)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
n, err := reqRpt.Read(content)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if n != contentSize {
t.Fatalf("Seek did not reset Reader")
}
}

0 comments on commit 27a1115

Please sign in to comment.