Skip to content

Commit

Permalink
Add --resume-size for runner
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Feb 12, 2025
1 parent c9540e3 commit 7e001ee
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 2 deletions.
9 changes: 9 additions & 0 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ func (c *Cache) Redirect(ctx context.Context, blobPath string, referer string) (
return u, nil
}

func (c *Cache) Writer(ctx context.Context, cachePath string, append bool) (storagedriver.FileWriter, error) {
return c.storageDriver.Writer(ctx, cachePath, append)
}

func (c *Cache) BlobWriter(ctx context.Context, blob string, append bool) (storagedriver.FileWriter, error) {
cachePath := blobCachePath(blob)
return c.Writer(ctx, cachePath, append)
}

func (c *Cache) put(ctx context.Context, cachePath string, r io.Reader, checkFunc func(int64) error) (int64, error) {
fw, err := c.storageDriver.Writer(ctx, cachePath, false)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions cmd/crproxy/cluster/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type flagpole struct {
BigStorageURL string
BigStorageSize int

ResumeSize int

StorageURL []string
Quick bool
Platform []string
Expand Down Expand Up @@ -65,6 +67,8 @@ func NewCommand() *cobra.Command {
cmd.Flags().StringArrayVar(&flags.StorageURL, "storage-url", flags.StorageURL, "Storage driver url")
cmd.Flags().StringVar(&flags.BigStorageURL, "big-storage-url", flags.BigStorageURL, "Big storage driver url")
cmd.Flags().IntVar(&flags.BigStorageSize, "big-storage-size", flags.BigStorageSize, "Big storage size")
cmd.Flags().IntVar(&flags.ResumeSize, "resume-size", flags.ResumeSize, "Resume size")

cmd.Flags().StringVar(&flags.ManifestStorageURL, "manifest-storage-url", flags.ManifestStorageURL, "manifest storage driver url")
cmd.Flags().BoolVar(&flags.Quick, "quick", flags.Quick, "Quick sync with tags")
cmd.Flags().StringSliceVar(&flags.Platform, "platform", flags.Platform, "Platform")
Expand Down Expand Up @@ -197,6 +201,10 @@ func runE(ctx context.Context, flags *flagpole) error {
opts = append(opts, runner.WithManifestCache(manifestsdcache))
}

if flags.ResumeSize > 0 {
opts = append(opts, runner.WithResumeSize(flags.ResumeSize))
}

runner, err := runner.NewRunner(opts...)
if err != nil {
return err
Expand Down
161 changes: 160 additions & 1 deletion runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"log/slog"
"math"
"net/http"
"net/url"
"sort"
Expand All @@ -19,10 +20,14 @@ import (
"github.com/daocloud/crproxy/internal/spec"
"github.com/daocloud/crproxy/queue/client"
"github.com/daocloud/crproxy/queue/model"
"github.com/daocloud/crproxy/storage"
csync "github.com/daocloud/crproxy/sync"
"github.com/wzshiming/httpseek"
)

type Runner struct {
resumeSize int

bigCacheSize int
bigCache *cache.Cache
manifestCache *cache.Cache
Expand Down Expand Up @@ -96,6 +101,12 @@ func WithManifestCache(cache *cache.Cache) Option {
}
}

func WithResumeSize(resumeSize int) Option {
return func(c *Runner) {
c.resumeSize = resumeSize
}
}

func NewRunner(opts ...Option) (*Runner, error) {
r := &Runner{
httpClient: http.DefaultClient,
Expand Down Expand Up @@ -426,11 +437,137 @@ func (r *Runner) blob(ctx context.Context, host, name, blob string, size int64,
r.logger.Info("skip blob by cache", "digest", blob)
return nil
}

req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
if err != nil {
return err
}

if r.resumeSize != 0 && size > int64(r.resumeSize) {
if len(subCaches) == 1 {
f, err := subCaches[0].BlobWriter(ctx, blob, true)
if err == nil {

seeker := httpseek.NewSeeker(ctx, http.DefaultTransport, req)

_, err = seeker.Seek(f.Size(), 0)
if err != nil {
return err
}

progress.Store(f.Size())

body := &readerCounter{
r: seeker,
counter: progress,
}
_, err = io.Copy(f, body)
if err != nil {
return err
}

err = f.Commit()
if err != nil {
return err
}

fi, err := subCaches[0].StatBlob(ctx, blob)
if err != nil {
return err
}

if fi.Size() != size {
err := subCaches[0].DeleteBlob(ctx, blob)
if err != nil {
return fmt.Errorf("%s is %d, but expected %d: %w", blob, fi.Size(), size, err)
}
return fmt.Errorf("%s is %d, but expected %d", blob, fi.Size(), size)
}
return nil
}
} else {
var offset int64 = math.MaxInt

rbws := []storage.FileWriter{}
for _, cache := range subCaches {
f, err := cache.BlobWriter(ctx, blob, true)
if err == nil {
if offset != 0 {
offset = min(offset, f.Size())
}
rbws = append(rbws, f)

} else {
offset = 0
f, err = cache.BlobWriter(ctx, blob, false)
if err != nil {
return err
}
rbws = append(rbws, f)
}
}

var writers []io.Writer
for _, w := range rbws {
n := w.Size() - offset
if n == 0 {
writers = append(writers, w)
} else if n > 0 {
writers = append(writers, &skipWriter{
writer: w,
offset: uint64(n),
})
} else {
panic("crproxy.runner: resume write blob error")
}
}

seeker := httpseek.NewSeeker(ctx, http.DefaultTransport, req)

_, err := seeker.Seek(offset, 0)
if err != nil {
return err
}

progress.Store(offset)

body := &readerCounter{
r: seeker,
counter: progress,
}

_, err = io.Copy(io.MultiWriter(writers...), body)
if err != nil {
return fmt.Errorf("copy blob failed: %w", err)
}

var errs []error
for _, c := range rbws {
err := c.Commit()
if err != nil {
errs = append(errs, err)
}
}

for _, cache := range subCaches {
fi, err := cache.StatBlob(ctx, blob)
if err != nil {
errs = append(errs, err)
continue
}

if fi.Size() != size {
if err := cache.DeleteBlob(ctx, blob); err != nil {
errs = append(errs, fmt.Errorf("%s is %d, but expected %d: %w", blob, fi.Size(), size, err))
} else {
errs = append(errs, fmt.Errorf("%s is %d, but expected %d", blob, fi.Size(), size))
}
}
}

return nil
}
}

resp, err := r.httpClient.Do(req)
if err != nil {
return err
Expand Down Expand Up @@ -869,3 +1006,25 @@ func (r *readerCounter) Read(b []byte) (int, error) {
r.counter.Add(int64(n))
return n, err
}

type skipWriter struct {
writer io.Writer
offset uint64
skipped uint64
}

func (w *skipWriter) Write(p []byte) (int, error) {
if w.skipped >= w.offset {
return w.writer.Write(p)
}

remaining := w.offset - w.skipped
if uint64(len(p)) <= remaining {
w.skipped += uint64(len(p))
return len(p), nil
}

w.skipped = w.offset
written, err := w.writer.Write(p[remaining:])
return int(remaining) + written, err
}
5 changes: 4 additions & 1 deletion storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import (
"github.com/docker/distribution/registry/storage/driver/factory"
)

type StorageDriver = driver.StorageDriver
type (
FileWriter = driver.FileWriter
StorageDriver = driver.StorageDriver
)

func NewStorage(uri string) (StorageDriver, error) {
u, err := url.Parse(uri)
Expand Down

0 comments on commit 7e001ee

Please sign in to comment.