Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

progressutil: refactor to use channels+select #63

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 57 additions & 60 deletions progressutil/iocopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,22 @@
package progressutil

import (
"errors"
"fmt"
"io"
"sync"
"time"
)

type copyReader struct {
reader io.Reader
current int64
total int64
done bool
doneLock sync.Mutex
pb *ProgressBar
}

func (cr *copyReader) getDone() bool {
cr.doneLock.Lock()
val := cr.done
cr.doneLock.Unlock()
return val
}
var (
ErrAlreadyStarted = errors.New("cannot add copies after PrintAndWait has been called")
)

func (cr *copyReader) setDone(val bool) {
cr.doneLock.Lock()
cr.done = val
cr.doneLock.Unlock()
type copyReader struct {
reader io.Reader
current int64
total int64
pb *ProgressBar
}

func (cr *copyReader) Read(p []byte) (int, error) {
Expand All @@ -50,9 +40,6 @@ func (cr *copyReader) Read(p []byte) (int, error) {
if err == nil {
err = err1
}
if err != nil {
cr.setDone(true)
}
return n, err
}

Expand All @@ -66,21 +53,35 @@ func (cr *copyReader) updateProgressBar() error {
return cr.pb.SetCurrentProgress(progress)
}

// NewCopyProgressPrinter returns a new CopyProgressPrinter
func NewCopyProgressPrinter() *CopyProgressPrinter {
return &CopyProgressPrinter{results: make(chan error)}
}

// CopyProgressPrinter will perform an arbitrary number of io.Copy calls, while
// continually printing the progress of each copy.
type CopyProgressPrinter struct {
readers []*copyReader
errors []error
results chan error

lock sync.Mutex
readers []*copyReader
started bool
pbp *ProgressBarPrinter
}

// AddCopy adds a copy for this CopyProgressPrinter to perform. An io.Copy call
// will be made to copy bytes from reader to dest, and name and size will be
// used to label the progress bar and display how much progress has been made.
// If size is 0, the total size of the reader is assumed to be unknown.
func (cpp *CopyProgressPrinter) AddCopy(reader io.Reader, name string, size int64, dest io.Writer) {
// AddCopy can only be called before PrintAndWait; otherwise, ErrAlreadyStarted
// will be returned.
func (cpp *CopyProgressPrinter) AddCopy(reader io.Reader, name string, size int64, dest io.Writer) error {
cpp.lock.Lock()
defer cpp.lock.Unlock()

if cpp.started {
return ErrAlreadyStarted
}
if cpp.pbp == nil {
cpp.pbp = &ProgressBarPrinter{}
cpp.pbp.PadToBeEven = true
Expand All @@ -96,60 +97,56 @@ func (cpp *CopyProgressPrinter) AddCopy(reader io.Reader, name string, size int6
cr.pb.SetPrintAfter(cr.formattedProgress())

cpp.readers = append(cpp.readers, cr)
cpp.lock.Unlock()

go func() {
_, err := io.Copy(dest, cr)
if err != nil {
cpp.lock.Lock()
cpp.errors = append(cpp.errors, err)
cpp.lock.Unlock()
}
cpp.results <- err
}()
return nil
}

// PrintAndWait will print the progress for each copy operation added with
// AddCopy to printTo every printInterval. This will continue until every added
// copy is finished, or until cancel is written to.
// PrintAndWait may only be called once; any subsequent calls will immediately
// return ErrAlreadyStarted. After PrintAndWait has been called, no more
// copies may be added to the CopyProgressPrinter.
func (cpp *CopyProgressPrinter) PrintAndWait(printTo io.Writer, printInterval time.Duration, cancel chan struct{}) error {
for {
// If cancel is not nil, see if anything has been written to it. If
// something has, return, otherwise keep drawing.
if cancel != nil {
select {
case <-cancel:
return nil
default:
}
}

cpp.lock.Lock()
readers := cpp.readers
errors := cpp.errors
cpp.lock.Lock()
if cpp.started {
cpp.lock.Unlock()
return ErrAlreadyStarted
}
cpp.started = true
cpp.lock.Unlock()

if len(errors) > 0 {
return errors[0]
}
n := len(cpp.readers)
if n == 0 {
// Nothing to do.
return nil
}

if len(readers) > 0 {
t := time.NewTicker(printInterval)
for i := 0; i < n; {
select {
case <-cancel:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we cancel we'll have a dangling goroutine started in AddCopy blocking on cpp.results.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we can't anticipate the size in the current interface let's just add a cancel channel for the copiers to listen on while they're trying to send to cpp results

return nil
case <-t.C:
_, err := cpp.pbp.Print(printTo)
if err != nil {
return err

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, this will leave a dangling goroutine started in AddCopy

}
} else {
}

allDone := true
for _, r := range readers {
allDone = allDone && r.getDone()
}
if allDone && len(readers) > 0 {
return nil
case err := <-cpp.results:
i++
if err == nil {
_, err = cpp.pbp.Print(printTo)
}
if err != nil {
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this case snippet. Due to re-using err, it is throwing away copy errors, not sure if on purpose. You are also discarding the returned bool, I think it will print multiple times on simultaneous completions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only discarding if the copy error was nil...

I'm not sure about the multiple thing - can you elaborate?

Luca Bruno notifications@github.com schrieb am So., 29. Mai 2016, 18:17:

In progressutil/iocopy.go
#63 (comment):

  •   if allDone && len(readers) > 0 {
    
  •       // We still need to check for errors again, as one may
    
  •       // have occurred between when we first checked and when
    
  •       // the readers were marked as done
    
  •       cpp.lock.Lock()
    
  •       errors = cpp.errors
    
  •       cpp.lock.Unlock()
    
  •       if len(errors) > 0 {
    
  •           return errors[0]
    
  •   case err := <-cpp.results:
    
  •       i++
    
  •       if err == nil {
    
  •           _, err = cpp.pbp.Print(printTo)
    
  •       }
    
  •       if err != nil {
    
  •           return err
    

I'm not sure about this case snippet. Due to re-using err, it is throwing
away copy errors, not sure if on purpose. You are also discarding the
returned bool, I think it will print multiple times on simultaneous
completions.


You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
https://github.com/coreos/pkg/pull/63/files/f24e4f6a55a09ae816d44b6ed1c1e09615ffcc73..08ada9715da766febf54352137eb1fa2fbbecf86#r65007478,
or mute the thread
https://github.com/notifications/unsubscribe/ACewN5_ZNgm5t1pq1DHcVCYHKfzv-PjXks5qGbv-gaJpZM4IpSVb
.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only discarding if the copy error was nil...

You are right, I misread it.

I'm not sure about the multiple thing - can you elaborate?

Print boolean signals if all copies have finished. This is ignored here, so even if all copies end within the first call, Print will be anyway called n times. (I am not sure what the observable behavior will be, I guess printing completions result n times).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, great point! I had this check in one of the versions (that's why
I noticed that allDone weirdness) but guess it fell out among the
refactoring

Luca Bruno notifications@github.com schrieb am So., 29. Mai 2016, 21:58:

In progressutil/iocopy.go
#63 (comment):

  •   if allDone && len(readers) > 0 {
    
  •       // We still need to check for errors again, as one may
    
  •       // have occurred between when we first checked and when
    
  •       // the readers were marked as done
    
  •       cpp.lock.Lock()
    
  •       errors = cpp.errors
    
  •       cpp.lock.Unlock()
    
  •       if len(errors) > 0 {
    
  •           return errors[0]
    
  •   case err := <-cpp.results:
    
  •       i++
    
  •       if err == nil {
    
  •           _, err = cpp.pbp.Print(printTo)
    
  •       }
    
  •       if err != nil {
    
  •           return err
    

It's only discarding if the copy error was nil...

You are right, I misread it.

I'm not sure about the multiple thing - can you elaborate?

Print boolean signals if all copies have finished. This is ignored here,
so even if all copies end within the first call, Print will be anyway
called n times. (I am not sure what the observable behavior will be, I
guess printing completions result n times).


You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
https://github.com/coreos/pkg/pull/63/files/f24e4f6a55a09ae816d44b6ed1c1e09615ffcc73..08ada9715da766febf54352137eb1fa2fbbecf86#r65010551,
or mute the thread
https://github.com/notifications/unsubscribe/ACewN6kc3G1eZzLdDShp5D2BecU_P9ccks5qGe_8gaJpZM4IpSVb
.

}
}

time.Sleep(printInterval)
}
return nil
}

func (cr *copyReader) formattedProgress() string {
Expand Down
4 changes: 2 additions & 2 deletions progressutil/progressbar.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,14 @@ func (pbp *ProgressBarPrinter) Print(printTo io.Writer) (bool, error) {
}
}

allDone := false
allDone := true
for _, bar := range bars {
if isTerminal(printTo) {
bar.printToTerminal(printTo, numColumns, pbp.PadToBeEven, pbp.maxBefore, pbp.maxAfter)
} else {
bar.printToNonTerminal(printTo)
}
allDone = allDone || bar.GetCurrentProgress() == 1
allDone = allDone && bar.GetCurrentProgress() == 1
}

pbp.numLinesInLastPrint = len(bars)
Expand Down