Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

progressutil: refactor to use channels #64

Merged
merged 3 commits into from
May 30, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 67 additions & 59 deletions progressutil/iocopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,22 @@
package progressutil

import (
"errors"
"fmt"
"io"
"sync"
"time"
)

type copyReader struct {
reader io.Reader
current int64
total int64
done bool
doneLock sync.Mutex
pb *ProgressBar
}

func (cr *copyReader) getDone() bool {
cr.doneLock.Lock()
val := cr.done
cr.doneLock.Unlock()
return val
}
var (
ErrAlreadyStarted = errors.New("cannot add copies after PrintAndWait has been called")
)

func (cr *copyReader) setDone(val bool) {
cr.doneLock.Lock()
cr.done = val
cr.doneLock.Unlock()
type copyReader struct {
reader io.Reader
current int64
total int64
pb *ProgressBar
}

func (cr *copyReader) Read(p []byte) (int, error) {
Expand All @@ -50,9 +40,6 @@ func (cr *copyReader) Read(p []byte) (int, error) {
if err == nil {
err = err1
}
if err != nil {
cr.setDone(true)
}
return n, err
}

Expand All @@ -66,21 +53,37 @@ func (cr *copyReader) updateProgressBar() error {
return cr.pb.SetCurrentProgress(progress)
}

// NewCopyProgressPrinter returns a new CopyProgressPrinter
func NewCopyProgressPrinter() *CopyProgressPrinter {
return &CopyProgressPrinter{results: make(chan error), cancel: make(chan struct{})}
}

// CopyProgressPrinter will perform an arbitrary number of io.Copy calls, while
// continually printing the progress of each copy.
type CopyProgressPrinter struct {
readers []*copyReader
errors []error
results chan error
cancel chan struct{}

// `lock` mutex protects all fields below it in CopyProgressPrinter struct
lock sync.Mutex

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please document which fields are being protected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@s-urbaniak aiui this is communicated through the struct layout - the lock protects the members listed immediately after it, and if there are multiple locks the struct members are grouped into blocks separated by an empty line. Wdyt?

readers []*copyReader
started bool
pbp *ProgressBarPrinter
}

// AddCopy adds a copy for this CopyProgressPrinter to perform. An io.Copy call
// will be made to copy bytes from reader to dest, and name and size will be
// used to label the progress bar and display how much progress has been made.
// If size is 0, the total size of the reader is assumed to be unknown.
func (cpp *CopyProgressPrinter) AddCopy(reader io.Reader, name string, size int64, dest io.Writer) {
// AddCopy can only be called before PrintAndWait; otherwise, ErrAlreadyStarted
// will be returned.
func (cpp *CopyProgressPrinter) AddCopy(reader io.Reader, name string, size int64, dest io.Writer) error {
cpp.lock.Lock()
defer cpp.lock.Unlock()

if cpp.started {
return ErrAlreadyStarted
}
if cpp.pbp == nil {
cpp.pbp = &ProgressBarPrinter{}
cpp.pbp.PadToBeEven = true
Expand All @@ -96,60 +99,65 @@ func (cpp *CopyProgressPrinter) AddCopy(reader io.Reader, name string, size int6
cr.pb.SetPrintAfter(cr.formattedProgress())

cpp.readers = append(cpp.readers, cr)
cpp.lock.Unlock()

go func() {
_, err := io.Copy(dest, cr)
if err != nil {
cpp.lock.Lock()
cpp.errors = append(cpp.errors, err)
cpp.lock.Unlock()
select {
case <-cpp.cancel:
return
case cpp.results <- err:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, either return from both case statements or from neither (it is redundant, since go case statements do not fall through by default, but I'd accept that it helps clarify to keep them)

return
}
}()
return nil
}

// PrintAndWait will print the progress for each copy operation added with
// AddCopy to printTo every printInterval. This will continue until every added
// copy is finished, or until cancel is written to.
// PrintAndWait may only be called once; any subsequent calls will immediately
// return ErrAlreadyStarted. After PrintAndWait has been called, no more
// copies may be added to the CopyProgressPrinter.
func (cpp *CopyProgressPrinter) PrintAndWait(printTo io.Writer, printInterval time.Duration, cancel chan struct{}) error {
for {
// If cancel is not nil, see if anything has been written to it. If
// something has, return, otherwise keep drawing.
if cancel != nil {
select {
case <-cancel:
return nil
default:
}
}

cpp.lock.Lock()
readers := cpp.readers
errors := cpp.errors
cpp.lock.Lock()
if cpp.started {
cpp.lock.Unlock()
return ErrAlreadyStarted
}
cpp.started = true
cpp.lock.Unlock()

if len(errors) > 0 {
return errors[0]
}
n := len(cpp.readers)
if n == 0 {
// Nothing to do.
return nil
}

if len(readers) > 0 {
defer close(cpp.cancel)
t := time.NewTicker(printInterval)
allDone := false
for i := 0; i < n; {
select {
case <-cancel:
return nil
case <-t.C:
_, err := cpp.pbp.Print(printTo)
if err != nil {
return err
}
} else {
}

allDone := true
for _, r := range readers {
allDone = allDone && r.getDone()
}
if allDone && len(readers) > 0 {
return nil
case err := <-cpp.results:
i++
// Once completion is signaled, further on this just drains
// (unlikely) errors from the channel.
if err == nil && !allDone {
allDone, err = cpp.pbp.Print(printTo)
}
if err != nil {
return err
}
}

time.Sleep(printInterval)
}
return nil
}

func (cr *copyReader) formattedProgress() string {
Expand Down
4 changes: 2 additions & 2 deletions progressutil/progressbar.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,14 @@ func (pbp *ProgressBarPrinter) Print(printTo io.Writer) (bool, error) {
}
}

allDone := false
allDone := true
for _, bar := range bars {
if isTerminal(printTo) {
bar.printToTerminal(printTo, numColumns, pbp.PadToBeEven, pbp.maxBefore, pbp.maxAfter)
} else {
bar.printToNonTerminal(printTo)
}
allDone = allDone || bar.GetCurrentProgress() == 1
allDone = allDone && bar.GetCurrentProgress() == 1
}

pbp.numLinesInLastPrint = len(bars)
Expand Down