From ef7a794e4b21299ab95496b380aba4bc37e63667 Mon Sep 17 00:00:00 2001 From: Shiming Zhang Date: Wed, 5 Feb 2025 14:20:19 +0800 Subject: [PATCH] Support blob to queue --- agent/agent.go | 72 +++++++++ cmd/crproxy/cluster/agent/agent.go | 9 ++ cmd/crproxy/cluster/runner/runner.go | 54 ++++++- gateway/gateway.go | 9 +- gateway/manifest.go | 4 +- queue/client/client.go | 6 +- queue/controller/message.go | 30 +++- queue/dao/messgae.go | 6 +- queue/model/message.go | 7 + runner/runner.go | 214 ++++++++++++++++++++++++++- 10 files changed, 387 insertions(+), 24 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index e20fe6f..90be96b 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -16,6 +16,8 @@ import ( "github.com/daocloud/crproxy/cache" "github.com/daocloud/crproxy/internal/queue" "github.com/daocloud/crproxy/internal/utils" + "github.com/daocloud/crproxy/queue/client" + "github.com/daocloud/crproxy/queue/model" "github.com/daocloud/crproxy/token" "github.com/docker/distribution/registry/api/errcode" "github.com/google/go-containerregistry/pkg/v1/remote/transport" @@ -53,6 +55,8 @@ type Agent struct { authenticator *token.Authenticator blobsLENoAgent int + + queueClient *client.MessageClient } type Option func(c *Agent) error @@ -112,6 +116,13 @@ func WithConcurrency(concurrency int) Option { } } +func WithQueueClient(queueClient *client.MessageClient) Option { + return func(c *Agent) error { + c.queueClient = queueClient + return nil + } +} + func NewAgent(opts ...Option) (*Agent, error) { c := &Agent{ logger: slog.Default(), @@ -182,6 +193,15 @@ func (c *Agent) worker(ctx context.Context) { if !ok { return } + + if c.queueClient != nil { + _, err := c.waitingQueue(ctx, info.Blobs, weight, &info) + if err == nil { + finish() + continue + } + c.logger.Warn("waitingQueue error", "info", info, "error", err) + } size, continueFunc, sc, err := c.cacheBlob(&info) if err != nil { c.logger.Warn("failed download file request", "info", info, "error", err) @@ -489,3 +509,55 @@ func (c *Agent) serveCachedBlob(rw http.ResponseWriter, r *http.Request, blob st c.logger.Info("Cache hit", "digest", blob, "url", u) http.Redirect(rw, r, u, http.StatusTemporaryRedirect) } + +func (c *Agent) waitingQueue(ctx context.Context, msg string, weight int, info *BlobInfo) (client.MessageResponse, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + mr, err := c.queueClient.Create(ctx, msg, weight+1, model.MessageAttr{ + Host: info.Host, + Image: info.Image, + }) + if err != nil { + return client.MessageResponse{}, fmt.Errorf("failed to create queue: %w", err) + } + + if mr.Status == model.StatusPending { + c.logger.Info("watching message from queue", "msg", msg) + + chMr, err := c.queueClient.Watch(ctx, mr.MessageID) + if err != nil { + return client.MessageResponse{}, fmt.Errorf("failed to watch message: %w", err) + } + watiQueue: + for { + select { + case <-ctx.Done(): + return client.MessageResponse{}, ctx.Err() + case m, ok := <-chMr: + if !ok { + if mr.Status != model.StatusPending && mr.Status != model.StatusProcessing { + break watiQueue + } + + time.Sleep(1 * time.Second) + chMr, err = c.queueClient.Watch(ctx, mr.MessageID) + if err != nil { + return client.MessageResponse{}, fmt.Errorf("failed to re-watch message: %w", err) + } + } else { + mr = m + } + } + } + } + + switch mr.Status { + case model.StatusCompleted: + return mr, nil + case model.StatusFailed: + return client.MessageResponse{}, fmt.Errorf("%q Queue Error: %s", msg, mr.Data.Error) + default: + return client.MessageResponse{}, fmt.Errorf("unexpected status %q for message %q", mr.Status, msg) + } +} diff --git a/cmd/crproxy/cluster/agent/agent.go b/cmd/crproxy/cluster/agent/agent.go index 6f2fb95..653cfb0 100644 --- a/cmd/crproxy/cluster/agent/agent.go +++ b/cmd/crproxy/cluster/agent/agent.go @@ -14,6 +14,7 @@ import ( "github.com/daocloud/crproxy/cache" "github.com/daocloud/crproxy/internal/pki" "github.com/daocloud/crproxy/internal/server" + "github.com/daocloud/crproxy/queue/client" "github.com/daocloud/crproxy/signing" "github.com/daocloud/crproxy/storage" "github.com/daocloud/crproxy/token" @@ -47,6 +48,9 @@ type flagpole struct { BlobCacheDuration time.Duration Concurrency int + + QueueURL string + QueueToken string } func NewCommand() *cobra.Command { @@ -132,6 +136,11 @@ func runE(ctx context.Context, flags *flagpole) error { agent.WithConcurrency(flags.Concurrency), ) + if flags.QueueURL != "" { + queueClient := client.NewMessageClient(http.DefaultClient, flags.QueueURL, flags.QueueToken) + opts = append(opts, agent.WithQueueClient(queueClient)) + } + if flags.TokenPublicKeyFile != "" { publicKeyData, err := os.ReadFile(flags.TokenPublicKeyFile) if err != nil { diff --git a/cmd/crproxy/cluster/runner/runner.go b/cmd/crproxy/cluster/runner/runner.go index 2ae591a..d3f477b 100644 --- a/cmd/crproxy/cluster/runner/runner.go +++ b/cmd/crproxy/cluster/runner/runner.go @@ -18,6 +18,7 @@ import ( "github.com/daocloud/crproxy/transport" "github.com/docker/distribution/manifest/manifestlist" "github.com/spf13/cobra" + "github.com/wzshiming/httpseek" ) type flagpole struct { @@ -25,11 +26,13 @@ type flagpole struct { AdminToken string - StorageURL []string - Deep bool - Quick bool - Platform []string - Userpass []string + StorageURL []string + Deep bool + Quick bool + Platform []string + Userpass []string + Retry int + RetryInterval time.Duration Lease string @@ -61,7 +64,8 @@ func NewCommand() *cobra.Command { cmd.Flags().BoolVar(&flags.Quick, "quick", flags.Quick, "Quick sync with tags") cmd.Flags().StringSliceVar(&flags.Platform, "platform", flags.Platform, "Platform") cmd.Flags().StringArrayVarP(&flags.Userpass, "user", "u", flags.Userpass, "host and username and password -u user:pwd@host") - + cmd.Flags().IntVar(&flags.Retry, "retry", flags.Retry, "Retry") + cmd.Flags().DurationVar(&flags.RetryInterval, "retry-interval", flags.RetryInterval, "Retry interval") cmd.Flags().DurationVar(&flags.Duration, "duration", flags.Duration, "Duration of the runner") cmd.Flags().StringVar(&flags.Lease, "lease", flags.Lease, "Lease of the runner") @@ -125,7 +129,43 @@ func runE(ctx context.Context, flags *flagpole) error { lease = fmt.Sprintf("%s-%d", flags.Lease, time.Now().Unix()) } - runner, err := runner.NewRunner(http.DefaultClient, lease, flags.QueueURL, flags.AdminToken, sm) + if flags.RetryInterval > 0 { + tp = httpseek.NewMustReaderTransport(tp, func(request *http.Request, retry int, err error) error { + if errors.Is(err, context.Canceled) || + errors.Is(err, context.DeadlineExceeded) { + return err + } + if flags.Retry > 0 && retry >= flags.Retry { + return err + } + if logger != nil { + logger.Warn("Retry", "url", request.URL, "retry", retry, "error", err) + } + time.Sleep(flags.RetryInterval) + return nil + }) + } + + client := &http.Client{ + CheckRedirect: func(req *http.Request, via []*http.Request) error { + if len(via) > 10 { + return http.ErrUseLastResponse + } + s := make([]string, 0, len(via)+1) + for _, v := range via { + s = append(s, v.URL.String()) + } + + lastRedirect := req.URL.String() + s = append(s, lastRedirect) + logger.Info("redirect", "redirects", s) + + return nil + }, + Transport: tp, + } + + runner, err := runner.NewRunner(client, caches, lease, flags.QueueURL, flags.AdminToken, sm) if err != nil { return err } diff --git a/gateway/gateway.go b/gateway/gateway.go index aacd6f8..4dc5c9c 100644 --- a/gateway/gateway.go +++ b/gateway/gateway.go @@ -171,13 +171,20 @@ func NewGateway(opts ...Option) (*Gateway, error) { } if c.cache != nil { - a, err := agent.NewAgent( + opts := []agent.Option{ agent.WithClient(c.httpClient), agent.WithAuthenticator(c.authenticator), agent.WithLogger(c.logger), agent.WithCache(c.cache), agent.WithBlobsLENoAgent(c.blobsLENoAgent), agent.WithConcurrency(c.concurrency), + } + if c.queueClient != nil { + opts = append(opts, agent.WithQueueClient(c.queueClient)) + } + + a, err := agent.NewAgent( + opts..., ) if err != nil { return nil, fmt.Errorf("failed to create agent: %w", err) diff --git a/gateway/manifest.go b/gateway/manifest.go index e65e377..da37aec 100644 --- a/gateway/manifest.go +++ b/gateway/manifest.go @@ -69,7 +69,7 @@ func (c *Gateway) waitingQueue(ctx context.Context, msg string, weight int) erro ctx, cancel := context.WithCancel(ctx) defer cancel() - mr, err := c.queueClient.Create(ctx, msg, weight+1) + mr, err := c.queueClient.Create(ctx, msg, weight+1, model.MessageAttr{}) if err != nil { return fmt.Errorf("failed to create queue: %w", err) } @@ -163,7 +163,7 @@ func (c *Gateway) cacheManifest(info *PathInfo, weight int) (int, error) { if err == nil { if cachedDigest != digest { msg := fmt.Sprintf("%s/%s:%s", info.Host, info.Image, info.Manifests) - _, err := c.queueClient.Create(context.Background(), msg, 0) + _, err := c.queueClient.Create(context.Background(), msg, 0, model.MessageAttr{}) if err != nil { c.logger.Warn("failed add message to queue", "msg", msg, "digest", digest, "error", err) } else { diff --git a/queue/client/client.go b/queue/client/client.go index f3d1732..646c177 100644 --- a/queue/client/client.go +++ b/queue/client/client.go @@ -16,6 +16,8 @@ import ( type MessageRequest struct { Content string `json:"content"` Priority int `json:"priority"` + + Data model.MessageAttr `json:"data,omitempty"` } type MessageResponse struct { @@ -63,8 +65,8 @@ func NewMessageClient(httpClient *http.Client, baseURL string, adminToken string } } -func (c *MessageClient) Create(ctx context.Context, content string, priority int) (MessageResponse, error) { - messageRequest := MessageRequest{Content: content, Priority: priority} +func (c *MessageClient) Create(ctx context.Context, content string, priority int, data model.MessageAttr) (MessageResponse, error) { + messageRequest := MessageRequest{Content: content, Priority: priority, Data: data} body, err := json.Marshal(messageRequest) if err != nil { return MessageResponse{}, err diff --git a/queue/controller/message.go b/queue/controller/message.go index 6b09ec2..4423732 100644 --- a/queue/controller/message.go +++ b/queue/controller/message.go @@ -19,6 +19,8 @@ import ( type MessageRequest struct { Content string `json:"content"` Priority int `json:"priority"` + + Data model.MessageAttr `json:"data,omitempty"` } type MessageResponse struct { @@ -304,6 +306,7 @@ func (mc *MessageController) Create(req *restful.Request, resp *restful.Response newMessage := model.Message{ Content: messageRequest.Content, Priority: messageRequest.Priority, + Data: messageRequest.Data, } messageID, err := mc.messageService.Create(req.Request.Context(), newMessage) if err != nil { @@ -315,6 +318,7 @@ func (mc *MessageController) Create(req *restful.Request, resp *restful.Response MessageID: messageID, Content: messageRequest.Content, Priority: messageRequest.Priority, + Data: messageRequest.Data, } mc.updateWatchListChannels(data) @@ -498,14 +502,18 @@ func (mc *MessageController) Heartbeat(req *restful.Request, resp *restful.Respo return } - if err := mc.messageService.Heartbeat(req.Request.Context(), messageID, heartbeatRequest.Data, heartbeatRequest.Lease); err != nil { - resp.WriteHeaderAndEntity(http.StatusNotAcceptable, Error{Code: "MessageNotAcceptableError", Message: "Message not found: " + err.Error()}) + curr, err := mc.messageService.GetByID(req.Request.Context(), messageID) + if err != nil { + resp.WriteHeaderAndEntity(http.StatusNotFound, Error{Code: "MessageNotFoundError", Message: "Message not found after heartbeat: " + err.Error()}) return } - curr, err := mc.messageService.GetByID(context.Background(), messageID) - if err != nil { - resp.WriteHeaderAndEntity(http.StatusNotFound, Error{Code: "MessageNotFoundError", Message: "Message not found after heartbeat: " + err.Error()}) + curr.Data.Blobs = heartbeatRequest.Data.Blobs + curr.Data.Progress = heartbeatRequest.Data.Progress + curr.Data.Size = heartbeatRequest.Data.Size + + if err := mc.messageService.Heartbeat(req.Request.Context(), messageID, curr.Data, heartbeatRequest.Lease); err != nil { + resp.WriteHeaderAndEntity(http.StatusNotAcceptable, Error{Code: "MessageNotAcceptableError", Message: "Message not found: " + err.Error()}) return } @@ -574,12 +582,20 @@ func (mc *MessageController) Failed(req *restful.Request, resp *restful.Response return } - if err := mc.messageService.Failed(req.Request.Context(), messageID, failedRequest.Lease, failedRequest.Data); err != nil { + curr, err := mc.messageService.GetByID(context.Background(), messageID) + if err != nil { + resp.WriteHeaderAndEntity(http.StatusNotFound, Error{Code: "MessageNotFoundError", Message: "Message not found after heartbeat: " + err.Error()}) + return + } + + curr.Data.Error = failedRequest.Data.Error + + if err := mc.messageService.Failed(req.Request.Context(), messageID, failedRequest.Lease, curr.Data); err != nil { resp.WriteHeaderAndEntity(http.StatusNotAcceptable, Error{Code: "MessageNotAcceptableError", Message: "Message not found: " + err.Error()}) return } - curr, err := mc.messageService.GetByID(context.Background(), messageID) + curr, err = mc.messageService.GetByID(context.Background(), messageID) if err != nil { resp.WriteHeaderAndEntity(http.StatusNotFound, Error{Code: "MessageNotFoundError", Message: "Message not found after failure: " + err.Error()}) return diff --git a/queue/dao/messgae.go b/queue/dao/messgae.go index 397fc2a..e6b5719 100644 --- a/queue/dao/messgae.go +++ b/queue/dao/messgae.go @@ -44,7 +44,7 @@ INSERT INTO messages (content, lease, priority, status, data) VALUES (?, ?, ?, ? func (m *Message) Create(ctx context.Context, message model.Message) (int64, error) { db := GetDB(ctx) - result, err := db.ExecContext(ctx, createMessageSQL, message.Content, message.Lease, message.Priority, model.StatusPending, "{}") + result, err := db.ExecContext(ctx, createMessageSQL, message.Content, message.Lease, message.Priority, model.StatusPending, message.Data) if err != nil { return 0, fmt.Errorf("failed to create message: %w", err) } @@ -112,12 +112,12 @@ func (m *Message) UpdatePriorityByID(ctx context.Context, id int64, priority int } const deleteMessageByIDSQL = ` -UPDATE messages SET delete_at = NOW(), data = ? WHERE id = ? AND delete_at IS NULL +UPDATE messages SET delete_at = NOW() WHERE id = ? AND delete_at IS NULL ` func (m *Message) DeleteByID(ctx context.Context, id int64) error { db := GetDB(ctx) - _, err := db.ExecContext(ctx, deleteMessageByIDSQL, model.MessageAttr{}, id) + _, err := db.ExecContext(ctx, deleteMessageByIDSQL, id) if err != nil { return fmt.Errorf("failed to delete message: %w", err) } diff --git a/queue/model/message.go b/queue/model/message.go index 3b8c246..f7afa70 100644 --- a/queue/model/message.go +++ b/queue/model/message.go @@ -26,6 +26,13 @@ type Message struct { type MessageAttr struct { Error string `json:"error,omitempty"` + + Host string `json:"host,omitempty"` + Image string `json:"image,omitempty"` + Progress int64 `json:"progress,omitempty"` + Size int64 `json:"size,omitempty"` + + // Deprecate Blobs []Blob `json:"blobs,omitempty"` } diff --git a/runner/runner.go b/runner/runner.go index 16b0429..09eae3b 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -4,18 +4,25 @@ import ( "context" "errors" "fmt" + "io" "log/slog" "net/http" + "net/url" "sort" + "strings" "sync" + "sync/atomic" "time" + "github.com/daocloud/crproxy/cache" "github.com/daocloud/crproxy/queue/client" "github.com/daocloud/crproxy/queue/model" csync "github.com/daocloud/crproxy/sync" ) type Runner struct { + caches []*cache.Cache + httpClient *http.Client client *client.MessageClient syncManager *csync.SyncManager lease string @@ -24,9 +31,11 @@ type Runner struct { syncCh chan struct{} } -func NewRunner(httpClient *http.Client, lease, baseURL string, adminToken string, syncManager *csync.SyncManager) (*Runner, error) { +func NewRunner(httpClient *http.Client, caches []*cache.Cache, lease, baseURL string, adminToken string, syncManager *csync.SyncManager) (*Runner, error) { cli := client.NewMessageClient(httpClient, baseURL, adminToken) return &Runner{ + httpClient: httpClient, + caches: caches, client: cli, lease: lease, syncManager: syncManager, @@ -155,7 +164,196 @@ func (r *Runner) runOnceSync(ctx context.Context, id string, logger *slog.Logger return errors.Join(errs...) } - err = r.client.Heartbeat(ctx, resp.MessageID, client.HeartbeatRequest{ + if strings.HasPrefix(resp.Content, "sha256:") { + if resp.Data.Host == "" || resp.Data.Image == "" { + return nil + } + return r.blobSync(ctx, id, resp, logger) + } + + return r.imageSync(ctx, id, resp, logger) +} + +func (r *Runner) blob(ctx context.Context, host, name, blob string, size int64, gotSize, progress *atomic.Int64, logger *slog.Logger) error { + var subCaches []*cache.Cache + for _, cache := range r.caches { + stat, err := cache.StatBlob(ctx, blob) + if err == nil { + if size > 0 { + gotSize := stat.Size() + if size == gotSize { + continue + } + logger.Error("size is not meeting expectations", "digest", blob, "size", size, "gotSize", gotSize) + } else { + continue + } + } + subCaches = append(subCaches, cache) + } + + if len(subCaches) == 0 { + logger.Info("skip blob by cache", "digest", blob) + return nil + } + + u := &url.URL{ + Scheme: "https", + Host: host, + Path: fmt.Sprintf("/v2/%s/blobs/%s", name, blob), + } + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) + if err != nil { + return err + } + resp, err := r.httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("failed to retrieve blob: status code %d", resp.StatusCode) + } + + logger.Info("start sync blob", "digest", blob, "url", u.String()) + + if resp.ContentLength > 0 && size > 0 { + if resp.ContentLength != size { + return fmt.Errorf("failed to retrieve blob: expected size %d, got %d", size, resp.ContentLength) + } + } + + if size > 0 { + gotSize.Store(size) + } + if resp.ContentLength > 0 { + gotSize.Store(resp.ContentLength) + } + + body := &readerCounter{ + r: resp.Body, + counter: progress, + } + + if len(subCaches) == 1 { + n, err := subCaches[0].PutBlob(ctx, blob, body) + if err != nil { + return fmt.Errorf("put blob failed: %w", err) + } + + logger.Info("finish sync blob", "digest", blob, "size", n) + return nil + } + + var writers []io.Writer + var closers []io.Closer + var wg sync.WaitGroup + + for _, ca := range subCaches { + pr, pw := io.Pipe() + writers = append(writers, pw) + closers = append(closers, pw) + wg.Add(1) + go func(cache *cache.Cache, pr io.Reader) { + defer wg.Done() + _, err := cache.PutBlob(ctx, blob, pr) + if err != nil { + logger.Error("put blob failed", "digest", blob, "error", err) + io.Copy(io.Discard, pr) + return + } + }(ca, pr) + } + + n, err := io.Copy(io.MultiWriter(writers...), body) + if err != nil { + return fmt.Errorf("copy blob failed: %w", err) + } + for _, c := range closers { + c.Close() + } + + wg.Wait() + + logger.Info("finish sync blob", "digest", blob, "size", n) + return nil +} + +func (r *Runner) blobSync(ctx context.Context, id string, resp client.MessageResponse, logger *slog.Logger) error { + err := r.client.Heartbeat(ctx, resp.MessageID, client.HeartbeatRequest{ + Lease: id, + }) + if err != nil { + _ = r.client.Cancel(ctx, resp.MessageID, client.CancelRequest{ + Lease: id, + }) + return err + } + + var errCh = make(chan error, 1) + d := resp.Data + + var gotSize, progress atomic.Int64 + + go func() { + errCh <- r.blob(ctx, d.Host, d.Image, resp.Content, d.Size, &gotSize, &progress, logger) + }() + + ticker := time.NewTicker(10 * time.Second) + + for { + select { + case <-ticker.C: + err := r.client.Heartbeat(ctx, resp.MessageID, client.HeartbeatRequest{ + Lease: id, + Data: model.MessageAttr{ + Size: gotSize.Load(), + Progress: progress.Load(), + }, + }) + + if err != nil { + logger.Error("Heartbeat", "error", err) + } + + case err := <-errCh: + if err == nil { + _ = r.client.Heartbeat(ctx, resp.MessageID, client.HeartbeatRequest{ + Lease: id, + Data: model.MessageAttr{ + Size: gotSize.Load(), + Progress: progress.Load(), + }, + }) + return r.client.Complete(ctx, resp.MessageID, client.CompletedRequest{ + Lease: id, + }) + } + + if errors.Is(err, context.Canceled) { + return r.client.Cancel(ctx, resp.MessageID, client.CancelRequest{ + Lease: id, + }) + } + + return r.client.Failed(ctx, resp.MessageID, client.FailedRequest{ + Lease: id, + Data: model.MessageAttr{ + Error: err.Error(), + }, + }) + case <-ctx.Done(): + return r.client.Cancel(ctx, resp.MessageID, client.CancelRequest{ + Lease: id, + }) + } + } +} + +func (r *Runner) imageSync(ctx context.Context, id string, resp client.MessageResponse, logger *slog.Logger) error { + err := r.client.Heartbeat(ctx, resp.MessageID, client.HeartbeatRequest{ Lease: id, }) if err != nil { @@ -251,3 +449,15 @@ func (r *Runner) runOnceSync(ctx context.Context, id string, logger *slog.Logger } } } + +type readerCounter struct { + r io.Reader + counter *atomic.Int64 +} + +func (r *readerCounter) Read(b []byte) (int, error) { + n, err := r.r.Read(b) + + r.counter.Add(int64(n)) + return n, err +}