Skip to content

Commit

Permalink
Add big storage for runner
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Feb 10, 2025
1 parent b3f9951 commit 00f0183
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 25 deletions.
24 changes: 22 additions & 2 deletions cmd/crproxy/cluster/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ import (
)

type flagpole struct {
QueueURL string

QueueURL string
QueueToken string

BigStorageURL string
BigStorageSize int

StorageURL []string
Quick bool
Platform []string
Expand Down Expand Up @@ -59,6 +61,8 @@ func NewCommand() *cobra.Command {
cmd.Flags().StringVar(&flags.QueueURL, "queue-url", flags.QueueURL, "Queue URL")

cmd.Flags().StringArrayVar(&flags.StorageURL, "storage-url", flags.StorageURL, "Storage driver url")
cmd.Flags().StringVar(&flags.BigStorageURL, "big-storage-url", flags.BigStorageURL, "Big storage driver url")
cmd.Flags().IntVar(&flags.BigStorageSize, "big-storage-size", flags.BigStorageSize, "Big storage size")
cmd.Flags().BoolVar(&flags.Quick, "quick", flags.Quick, "Quick sync with tags")
cmd.Flags().StringSliceVar(&flags.Platform, "platform", flags.Platform, "Platform")
cmd.Flags().StringArrayVarP(&flags.Userpass, "user", "u", flags.Userpass, "host and username and password -u user:pwd@host")
Expand Down Expand Up @@ -158,6 +162,22 @@ func runE(ctx context.Context, flags *flagpole) error {
runner.WithFilterPlatform(filterPlatform(flags.Platform)),
}

if flags.BigStorageURL != "" && flags.BigStorageSize > 0 {
bigCacheOpts := []cache.Option{}
sd, err := storage.NewStorage(flags.BigStorageURL)
if err != nil {
return fmt.Errorf("create storage driver failed: %w", err)
}
bigCacheOpts = append(bigCacheOpts,
cache.WithStorageDriver(sd),
)
bigsdcache, err := cache.NewCache(bigCacheOpts...)
if err != nil {
return fmt.Errorf("create cache failed: %w", err)
}
opts = append(opts, runner.WithBigCache(bigsdcache, flags.BigStorageSize))
}

runner, err := runner.NewRunner(opts...)
if err != nil {
return err
Expand Down
80 changes: 57 additions & 23 deletions runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
)

type Runner struct {
bigCacheSize int
bigCache *cache.Cache

caches []*cache.Cache
httpClient *http.Client
queueClient *client.MessageClient
Expand Down Expand Up @@ -79,6 +82,13 @@ func WithLease(lease string) Option {
}
}

func WithBigCache(cache *cache.Cache, size int) Option {
return func(c *Runner) {
c.bigCache = cache
c.bigCacheSize = size
}
}

func NewRunner(opts ...Option) (*Runner, error) {
r := &Runner{
httpClient: http.DefaultClient,
Expand Down Expand Up @@ -355,19 +365,52 @@ func (r *Runner) runOnceManifestSync(ctx context.Context) error {
}

func (r *Runner) blob(ctx context.Context, host, name, blob string, size int64, gotSize, progress *atomic.Int64) error {
u := &url.URL{
Scheme: "https",
Host: host,
Path: fmt.Sprintf("/v2/%s/blobs/%s", name, blob),
}

if size == 0 {
req, err := http.NewRequestWithContext(ctx, http.MethodHead, u.String(), nil)
if err != nil {
return err
}
resp, err := r.httpClient.Do(req)
if err != nil {
return err
}
if resp.Body != nil {
_ = resp.Body.Close()
}

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("failed to head blob: status code %d", resp.StatusCode)
}
size = resp.ContentLength
}

if size > 0 {
gotSize.Store(size)
}

var caches []*cache.Cache

if r.bigCache != nil && r.bigCacheSize > 0 && size >= int64(r.bigCacheSize) {
caches = append([]*cache.Cache{r.bigCache}, r.caches...)
} else {
caches = append(caches, r.caches...)
}

var subCaches []*cache.Cache
for _, cache := range r.caches {
for _, cache := range caches {
stat, err := cache.StatBlob(ctx, blob)
if err == nil {
if size > 0 {
gotSize := stat.Size()
if size == gotSize {
continue
}
r.logger.Error("size is not meeting expectations", "digest", blob, "size", size, "gotSize", gotSize)
} else {
gotSize := stat.Size()
if size == gotSize {
continue
}
r.logger.Error("size is not meeting expectations", "digest", blob, "size", size, "gotSize", gotSize)
}
subCaches = append(subCaches, cache)
}
Expand All @@ -377,12 +420,6 @@ func (r *Runner) blob(ctx context.Context, host, name, blob string, size int64,
return nil
}

u := &url.URL{
Scheme: "https",
Host: host,
Path: fmt.Sprintf("/v2/%s/blobs/%s", name, blob),
}

req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
if err != nil {
return err
Expand All @@ -394,7 +431,7 @@ func (r *Runner) blob(ctx context.Context, host, name, blob string, size int64,
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("failed to retrieve blob: status code %d", resp.StatusCode)
return fmt.Errorf("failed to get blob: status code %d", resp.StatusCode)
}

r.logger.Info("start sync blob", "digest", blob, "url", u.String())
Expand All @@ -405,13 +442,6 @@ func (r *Runner) blob(ctx context.Context, host, name, blob string, size int64,
}
}

if size > 0 {
gotSize.Store(size)
}
if resp.ContentLength > 0 {
gotSize.Store(resp.ContentLength)
}

body := &readerCounter{
r: resp.Body,
counter: progress,
Expand Down Expand Up @@ -609,6 +639,10 @@ func (r *Runner) manifest(ctx context.Context, messageID int64, host, image, tag
_ = resp.Body.Close()
}

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("failed to head manifest: status code %d", resp.StatusCode)
}

digest := resp.Header.Get("Docker-Content-Digest")
if digest != "" {
for _, cache := range r.caches {
Expand Down Expand Up @@ -651,7 +685,7 @@ func (r *Runner) manifest(ctx context.Context, messageID int64, host, image, tag
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("failed to retrieve manifest: status code %d", resp.StatusCode)
return fmt.Errorf("failed to get manifest: status code %d", resp.StatusCode)
}

body, err := io.ReadAll(resp.Body)
Expand Down

0 comments on commit 00f0183

Please sign in to comment.