diff --git a/gateway/manifest.go b/gateway/manifest.go index 9f677f7..f3b163b 100644 --- a/gateway/manifest.go +++ b/gateway/manifest.go @@ -2,6 +2,8 @@ package gateway import ( "context" + "crypto/sha256" + "encoding/hex" "encoding/json" "errors" "fmt" @@ -11,7 +13,6 @@ import ( "strconv" "time" - "github.com/daocloud/crproxy/internal/spec" "github.com/daocloud/crproxy/internal/utils" "github.com/daocloud/crproxy/queue/model" "github.com/daocloud/crproxy/token" @@ -21,12 +22,12 @@ import ( func (c *Gateway) worker(ctx context.Context) { for { - info, weight, finish, ok := c.queue.GetOrWaitWithDone(ctx.Done()) + info, _, finish, ok := c.queue.GetOrWaitWithDone(ctx.Done()) if !ok { return } - sc, err := c.cacheManifest(&info, weight) + sc, err := c.cacheManifest(&info) if err != nil { c.manifestCache.PutError(&info, err, sc) } @@ -66,27 +67,30 @@ func (c *Gateway) cacheManifestResponse(rw http.ResponseWriter, r *http.Request, utils.ServeError(rw, r, errcode.ErrorCodeUnknown, 0) } -func (c *Gateway) waitingQueue(ctx context.Context, msg string, weight int) error { +func (c *Gateway) waitingQueue(ctx context.Context, msg string, weight int) ([]byte, error) { ctx, cancel := context.WithCancel(ctx) defer cancel() mr, err := c.queueClient.Create(ctx, msg, weight+1, model.MessageAttr{}) if err != nil { - return fmt.Errorf("failed to create queue: %w", err) + return nil, fmt.Errorf("failed to create queue: %w", err) } - if mr.Status == model.StatusPending { + if len(mr.Data.Spec) != 0 { + return mr.Data.Spec, nil + } + if mr.Status == model.StatusPending || mr.Status == model.StatusProcessing { c.logger.Info("watching message from queue", "msg", msg) chMr, err := c.queueClient.Watch(ctx, mr.MessageID) if err != nil { - return fmt.Errorf("failed to watch message: %w", err) + return nil, fmt.Errorf("failed to watch message: %w", err) } watiQueue: for { select { case <-ctx.Done(): - return ctx.Err() + return nil, ctx.Err() case m, ok := <-chMr: if !ok { if mr.Status != model.StatusPending && mr.Status != model.StatusProcessing { @@ -96,9 +100,12 @@ func (c *Gateway) waitingQueue(ctx context.Context, msg string, weight int) erro time.Sleep(1 * time.Second) chMr, err = c.queueClient.Watch(ctx, mr.MessageID) if err != nil { - return fmt.Errorf("failed to re-watch message: %w", err) + return nil, fmt.Errorf("failed to re-watch message: %w", err) } } else { + if len(m.Data.Spec) != 0 { + return m.Data.Spec, nil + } mr = m } } @@ -107,15 +114,15 @@ func (c *Gateway) waitingQueue(ctx context.Context, msg string, weight int) erro switch mr.Status { case model.StatusCompleted: - return nil + return nil, nil case model.StatusFailed: - return fmt.Errorf("%q Queue Error: %s", msg, mr.Data.Error) + return nil, fmt.Errorf("%q Queue Error: %s", msg, mr.Data.Error) default: - return fmt.Errorf("unexpected status %q for message %q", mr.Status, msg) + return nil, fmt.Errorf("unexpected status %d for message %q", mr.Status, msg) } } -func (c *Gateway) cacheManifest(info *PathInfo, weight int) (int, error) { +func (c *Gateway) cacheManifest(info *PathInfo) (int, error) { ctx := context.Background() u := &url.URL{ Scheme: "https", @@ -252,40 +259,84 @@ func (c *Gateway) cacheManifest(info *PathInfo, weight int) (int, error) { MediaType: mediaType, Length: strconv.FormatInt(size, 10), }) + return 0, nil +} - if c.queueClient != nil { - ml := spec.ManifestLayers{} - json.Unmarshal(body, &ml) +func (c *Gateway) cacheQueueManifest(info *PathInfo, weight int) error { + ctx := context.Background() + if !info.IsDigestManifests { + cachedDigest, err := c.cache.DigestManifest(ctx, info.Host, info.Image, info.Manifests) + if err == nil { + _, err := c.cache.StatBlob(ctx, cachedDigest) + if err == nil { - if len(ml.Layers) != 0 { - for _, l := range ml.Layers { - if l.Digest == "" { - continue - } - _, err := c.queueClient.Create(context.Background(), l.Digest, 0, model.MessageAttr{ - Host: info.Host, - Image: info.Image, - Size: l.Size, - }) + msg := fmt.Sprintf("%s/%s:%s", info.Host, info.Image, info.Manifests) + _, err := c.queueClient.Create(context.Background(), msg, 0, model.MessageAttr{}) if err != nil { - c.logger.Warn("failed add blob message to queue", "digest", digest, "error", err) + c.logger.Warn("failed add message to queue", "msg", msg, "error", err) + } else { + c.logger.Info("Add message to queue", "msg", msg) } - } - l := ml.Config - if l.Digest != "" { - - _, err := c.queueClient.Create(context.Background(), l.Digest, 0, model.MessageAttr{ - Host: info.Host, - Image: info.Image, - Size: l.Size, + c.manifestCache.Put(info, cacheValue{ + Digest: cachedDigest, }) - if err != nil { - c.logger.Warn("failed add blob message to queue", "digest", digest, "error", err) - } + return nil } } } - return 0, nil + + var msg string + if info.IsDigestManifests { + msg = fmt.Sprintf("%s/%s@%s", info.Host, info.Image, info.Manifests) + } else { + msg = fmt.Sprintf("%s/%s:%s", info.Host, info.Image, info.Manifests) + } + spec, err := c.waitingQueue(ctx, msg, weight+1) + if err != nil { + return err + } + + if len(spec) == 0 { + exist, err := c.cache.StatManifest(ctx, info.Host, info.Image, info.Manifests) + if err != nil { + return err + } + if !exist { + return fmt.Errorf("spec is empty") + } + c.manifestCache.Put(info, cacheValue{ + Digest: info.Manifests, + }) + return nil + } + + mt := struct { + MediaType string `json:"mediaType"` + Manifests json.RawMessage `json:"manifests"` + }{} + err = json.Unmarshal(spec, &mt) + if err != nil { + return fmt.Errorf("invalid content: %w: %s", err, string(spec)) + } + + mediaType := mt.MediaType + if mediaType == "" { + if len(mt.Manifests) != 0 { + mediaType = "application/vnd.oci.image.index.v1+json" + } else { + mediaType = "application/vnd.docker.distribution.manifest.v1+json" + } + } + + sum := sha256.Sum256(spec) + digest := "sha256:" + hex.EncodeToString(sum[:]) + c.manifestCache.Put(info, cacheValue{ + Digest: digest, + MediaType: mediaType, + Length: strconv.FormatInt(int64(len(spec)), 10), + Body: spec, + }) + return nil } func (c *Gateway) tryFirstServeCachedManifest(rw http.ResponseWriter, r *http.Request, info *PathInfo, t *token.Token) (done bool, fallback bool) { @@ -306,6 +357,15 @@ func (c *Gateway) tryFirstServeCachedManifest(rw http.ResponseWriter, r *http.Re rw.Header().Set("Content-Length", val.Length) return true, false } + + if len(val.Body) != 0 { + rw.Header().Set("Docker-Content-Digest", val.Digest) + rw.Header().Set("Content-Type", val.MediaType) + rw.Header().Set("Content-Length", val.Length) + rw.Write(val.Body) + return true, false + } + return c.serveCachedManifest(rw, r, info, false, "hit"), false } @@ -322,6 +382,17 @@ func (c *Gateway) tryFirstServeCachedManifest(rw http.ResponseWriter, r *http.Re return false, true } + if t.ManifestWithQueueSync { + if c.queueClient != nil { + err := c.cacheQueueManifest(info, t.Weight) + if err != nil { + c.manifestCache.PutError(info, err, http.StatusForbidden) + return false, false + } + return true, false + } + } + return false, false } @@ -343,6 +414,15 @@ func (c *Gateway) missServeCachedManifest(rw http.ResponseWriter, r *http.Reques rw.Header().Set("Content-Length", val.Length) return true } + + if len(val.Body) != 0 { + rw.Header().Set("Docker-Content-Digest", val.Digest) + rw.Header().Set("Content-Type", val.MediaType) + rw.Header().Set("Content-Length", val.Length) + rw.Write(val.Body) + return true + } + return c.serveCachedManifest(rw, r, info, true, "miss") } diff --git a/gateway/manifest_cache.go b/gateway/manifest_cache.go index 3c52b20..87f5f7f 100644 --- a/gateway/manifest_cache.go +++ b/gateway/manifest_cache.go @@ -72,6 +72,7 @@ func (m *manifestCache) Get(info *PathInfo) (cacheValue, bool) { Digest: key.Tag, MediaType: val.MediaType, Length: val.Length, + Body: val.Body, }, true } @@ -105,6 +106,7 @@ func (m *manifestCache) Put(info *PathInfo, val cacheValue) { m.digest.SetWithTTL(key, cacheDigestValue{ MediaType: val.MediaType, Length: val.Length, + Body: val.Body, }, m.duration) } @@ -124,6 +126,7 @@ type cacheTagValue struct { type cacheDigestValue struct { MediaType string Length string + Body []byte Error error StatusCode int } @@ -132,6 +135,7 @@ type cacheValue struct { Digest string MediaType string Length string + Body []byte Error error StatusCode int } diff --git a/manager/manager.go b/manager/manager.go index 91dd8ad..fb76005 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -226,10 +226,11 @@ func (m *Manager) GetTokenWithUser(ctx context.Context, userinfo *url.Userinfo, TokenID: tok.TokenID, RegistryID: registry.Registry.RegistryID, - NoRateLimit: tok.Data.NoRateLimit, - RateLimitPerSecond: tok.Data.RateLimitPerSecond, - Weight: tok.Data.Weight, - CacheFirst: tok.Data.CacheFirst, + NoRateLimit: tok.Data.NoRateLimit, + RateLimitPerSecond: tok.Data.RateLimitPerSecond, + Weight: tok.Data.Weight, + ManifestWithQueueSync: tok.Data.ManifestWithQueueSync, + CacheFirst: tok.Data.CacheFirst, NoAllowlist: tok.Data.NoAllowlist, NoBlock: tok.Data.NoBlock, diff --git a/manager/model/token.go b/manager/model/token.go index 445a6a1..5d24147 100644 --- a/manager/model/token.go +++ b/manager/model/token.go @@ -14,13 +14,14 @@ type Token struct { } type TokenAttr struct { - NoRateLimit bool `json:"no_rate_limit,omitempty"` - RateLimitPerSecond uint64 `json:"rate_limit_per_second,omitempty"` - Weight int `json:"weight,omitempty"` - CacheFirst bool `json:"cache_first,omitempty"` - AllowTagsList bool `json:"allow_tags_list,omitempty"` - NoAllowlist bool `json:"no_allowlist,omitempty"` - NoBlock bool `json:"no_block,omitempty"` + NoRateLimit bool `json:"no_rate_limit,omitempty"` + RateLimitPerSecond uint64 `json:"rate_limit_per_second,omitempty"` + Weight int `json:"weight,omitempty"` + CacheFirst bool `json:"cache_first,omitempty"` + ManifestWithQueueSync bool `json:"manifest_with_queue_sync,omitempty"` + AllowTagsList bool `json:"allow_tags_list,omitempty"` + NoAllowlist bool `json:"no_allowlist,omitempty"` + NoBlock bool `json:"no_block,omitempty"` NoBlobsAgent bool `json:"no_blobs_agent,omitempty"` BlobsAgentURL string `json:"blobs_url,omitempty"` diff --git a/token/encoding.go b/token/encoding.go index f6d3dbf..670fee0 100644 --- a/token/encoding.go +++ b/token/encoding.go @@ -52,6 +52,8 @@ type Attribute struct { CacheFirst bool `json:"cache_first,omitempty"` Weight int `json:"weight,omitempty"` + ManifestWithQueueSync bool `json:"manifest_with_queue_sync,omitempty"` + Host string `json:"host,omitempty"` Image string `json:"image,omitempty"`