Skip to content

Commit

Permalink
Manifests check with queue
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Feb 7, 2025
1 parent 381dc6b commit 8841e7b
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 50 deletions.
158 changes: 119 additions & 39 deletions gateway/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package gateway

import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
Expand All @@ -11,7 +13,6 @@ import (
"strconv"
"time"

"github.com/daocloud/crproxy/internal/spec"
"github.com/daocloud/crproxy/internal/utils"
"github.com/daocloud/crproxy/queue/model"
"github.com/daocloud/crproxy/token"
Expand All @@ -21,12 +22,12 @@ import (

func (c *Gateway) worker(ctx context.Context) {
for {
info, weight, finish, ok := c.queue.GetOrWaitWithDone(ctx.Done())
info, _, finish, ok := c.queue.GetOrWaitWithDone(ctx.Done())
if !ok {
return
}

sc, err := c.cacheManifest(&info, weight)
sc, err := c.cacheManifest(&info)
if err != nil {
c.manifestCache.PutError(&info, err, sc)
}
Expand Down Expand Up @@ -66,27 +67,30 @@ func (c *Gateway) cacheManifestResponse(rw http.ResponseWriter, r *http.Request,
utils.ServeError(rw, r, errcode.ErrorCodeUnknown, 0)
}

func (c *Gateway) waitingQueue(ctx context.Context, msg string, weight int) error {
func (c *Gateway) waitingQueue(ctx context.Context, msg string, weight int) ([]byte, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

mr, err := c.queueClient.Create(ctx, msg, weight+1, model.MessageAttr{})
if err != nil {
return fmt.Errorf("failed to create queue: %w", err)
return nil, fmt.Errorf("failed to create queue: %w", err)
}

if mr.Status == model.StatusPending {
if len(mr.Data.Spec) != 0 {
return mr.Data.Spec, nil
}
if mr.Status == model.StatusPending || mr.Status == model.StatusProcessing {
c.logger.Info("watching message from queue", "msg", msg)

chMr, err := c.queueClient.Watch(ctx, mr.MessageID)
if err != nil {
return fmt.Errorf("failed to watch message: %w", err)
return nil, fmt.Errorf("failed to watch message: %w", err)
}
watiQueue:
for {
select {
case <-ctx.Done():
return ctx.Err()
return nil, ctx.Err()
case m, ok := <-chMr:
if !ok {
if mr.Status != model.StatusPending && mr.Status != model.StatusProcessing {
Expand All @@ -96,9 +100,12 @@ func (c *Gateway) waitingQueue(ctx context.Context, msg string, weight int) erro
time.Sleep(1 * time.Second)
chMr, err = c.queueClient.Watch(ctx, mr.MessageID)
if err != nil {
return fmt.Errorf("failed to re-watch message: %w", err)
return nil, fmt.Errorf("failed to re-watch message: %w", err)
}
} else {
if len(m.Data.Spec) != 0 {
return m.Data.Spec, nil
}
mr = m
}
}
Expand All @@ -107,15 +114,15 @@ func (c *Gateway) waitingQueue(ctx context.Context, msg string, weight int) erro

switch mr.Status {
case model.StatusCompleted:
return nil
return nil, nil
case model.StatusFailed:
return fmt.Errorf("%q Queue Error: %s", msg, mr.Data.Error)
return nil, fmt.Errorf("%q Queue Error: %s", msg, mr.Data.Error)
default:
return fmt.Errorf("unexpected status %q for message %q", mr.Status, msg)
return nil, fmt.Errorf("unexpected status %d for message %q", mr.Status, msg)
}
}

func (c *Gateway) cacheManifest(info *PathInfo, weight int) (int, error) {
func (c *Gateway) cacheManifest(info *PathInfo) (int, error) {
ctx := context.Background()
u := &url.URL{
Scheme: "https",
Expand Down Expand Up @@ -252,40 +259,84 @@ func (c *Gateway) cacheManifest(info *PathInfo, weight int) (int, error) {
MediaType: mediaType,
Length: strconv.FormatInt(size, 10),
})
return 0, nil
}

if c.queueClient != nil {
ml := spec.ManifestLayers{}
json.Unmarshal(body, &ml)
func (c *Gateway) cacheQueueManifest(info *PathInfo, weight int) error {
ctx := context.Background()
if !info.IsDigestManifests {
cachedDigest, err := c.cache.DigestManifest(ctx, info.Host, info.Image, info.Manifests)
if err == nil {
_, err := c.cache.StatBlob(ctx, cachedDigest)
if err == nil {

if len(ml.Layers) != 0 {
for _, l := range ml.Layers {
if l.Digest == "" {
continue
}
_, err := c.queueClient.Create(context.Background(), l.Digest, 0, model.MessageAttr{
Host: info.Host,
Image: info.Image,
Size: l.Size,
})
msg := fmt.Sprintf("%s/%s:%s", info.Host, info.Image, info.Manifests)
_, err := c.queueClient.Create(context.Background(), msg, 0, model.MessageAttr{})
if err != nil {
c.logger.Warn("failed add blob message to queue", "digest", digest, "error", err)
c.logger.Warn("failed add message to queue", "msg", msg, "error", err)
} else {
c.logger.Info("Add message to queue", "msg", msg)
}
}
l := ml.Config
if l.Digest != "" {

_, err := c.queueClient.Create(context.Background(), l.Digest, 0, model.MessageAttr{
Host: info.Host,
Image: info.Image,
Size: l.Size,
c.manifestCache.Put(info, cacheValue{
Digest: cachedDigest,
})
if err != nil {
c.logger.Warn("failed add blob message to queue", "digest", digest, "error", err)
}
return nil
}
}
}
return 0, nil

var msg string
if info.IsDigestManifests {
msg = fmt.Sprintf("%s/%s@%s", info.Host, info.Image, info.Manifests)
} else {
msg = fmt.Sprintf("%s/%s:%s", info.Host, info.Image, info.Manifests)
}
spec, err := c.waitingQueue(ctx, msg, weight+1)
if err != nil {
return err
}

if len(spec) == 0 {
exist, err := c.cache.StatManifest(ctx, info.Host, info.Image, info.Manifests)
if err != nil {
return err
}
if !exist {
return fmt.Errorf("spec is empty")
}
c.manifestCache.Put(info, cacheValue{
Digest: info.Manifests,
})
return nil
}

mt := struct {
MediaType string `json:"mediaType"`
Manifests json.RawMessage `json:"manifests"`
}{}
err = json.Unmarshal(spec, &mt)
if err != nil {
return fmt.Errorf("invalid content: %w: %s", err, string(spec))
}

mediaType := mt.MediaType
if mediaType == "" {
if len(mt.Manifests) != 0 {
mediaType = "application/vnd.oci.image.index.v1+json"
} else {
mediaType = "application/vnd.docker.distribution.manifest.v1+json"
}
}

sum := sha256.Sum256(spec)
digest := "sha256:" + hex.EncodeToString(sum[:])
c.manifestCache.Put(info, cacheValue{
Digest: digest,
MediaType: mediaType,
Length: strconv.FormatInt(int64(len(spec)), 10),
Body: spec,
})
return nil
}

func (c *Gateway) tryFirstServeCachedManifest(rw http.ResponseWriter, r *http.Request, info *PathInfo, t *token.Token) (done bool, fallback bool) {
Expand All @@ -306,6 +357,15 @@ func (c *Gateway) tryFirstServeCachedManifest(rw http.ResponseWriter, r *http.Re
rw.Header().Set("Content-Length", val.Length)
return true, false
}

if len(val.Body) != 0 {
rw.Header().Set("Docker-Content-Digest", val.Digest)
rw.Header().Set("Content-Type", val.MediaType)
rw.Header().Set("Content-Length", val.Length)
rw.Write(val.Body)
return true, false
}

return c.serveCachedManifest(rw, r, info, false, "hit"), false
}

Expand All @@ -322,6 +382,17 @@ func (c *Gateway) tryFirstServeCachedManifest(rw http.ResponseWriter, r *http.Re
return false, true
}

if t.ManifestWithQueueSync {
if c.queueClient != nil {
err := c.cacheQueueManifest(info, t.Weight)
if err != nil {
c.manifestCache.PutError(info, err, http.StatusForbidden)
return false, false
}
return true, false
}
}

return false, false
}

Expand All @@ -343,6 +414,15 @@ func (c *Gateway) missServeCachedManifest(rw http.ResponseWriter, r *http.Reques
rw.Header().Set("Content-Length", val.Length)
return true
}

if len(val.Body) != 0 {
rw.Header().Set("Docker-Content-Digest", val.Digest)
rw.Header().Set("Content-Type", val.MediaType)
rw.Header().Set("Content-Length", val.Length)
rw.Write(val.Body)
return true
}

return c.serveCachedManifest(rw, r, info, true, "miss")
}

Expand Down
4 changes: 4 additions & 0 deletions gateway/manifest_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (m *manifestCache) Get(info *PathInfo) (cacheValue, bool) {
Digest: key.Tag,
MediaType: val.MediaType,
Length: val.Length,
Body: val.Body,
}, true
}

Expand Down Expand Up @@ -105,6 +106,7 @@ func (m *manifestCache) Put(info *PathInfo, val cacheValue) {
m.digest.SetWithTTL(key, cacheDigestValue{
MediaType: val.MediaType,
Length: val.Length,
Body: val.Body,
}, m.duration)

}
Expand All @@ -124,6 +126,7 @@ type cacheTagValue struct {
type cacheDigestValue struct {
MediaType string
Length string
Body []byte
Error error
StatusCode int
}
Expand All @@ -132,6 +135,7 @@ type cacheValue struct {
Digest string
MediaType string
Length string
Body []byte
Error error
StatusCode int
}
Expand Down
9 changes: 5 additions & 4 deletions manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,11 @@ func (m *Manager) GetTokenWithUser(ctx context.Context, userinfo *url.Userinfo,
TokenID: tok.TokenID,
RegistryID: registry.Registry.RegistryID,

NoRateLimit: tok.Data.NoRateLimit,
RateLimitPerSecond: tok.Data.RateLimitPerSecond,
Weight: tok.Data.Weight,
CacheFirst: tok.Data.CacheFirst,
NoRateLimit: tok.Data.NoRateLimit,
RateLimitPerSecond: tok.Data.RateLimitPerSecond,
Weight: tok.Data.Weight,
ManifestWithQueueSync: tok.Data.ManifestWithQueueSync,
CacheFirst: tok.Data.CacheFirst,

NoAllowlist: tok.Data.NoAllowlist,
NoBlock: tok.Data.NoBlock,
Expand Down
15 changes: 8 additions & 7 deletions manager/model/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ type Token struct {
}

type TokenAttr struct {
NoRateLimit bool `json:"no_rate_limit,omitempty"`
RateLimitPerSecond uint64 `json:"rate_limit_per_second,omitempty"`
Weight int `json:"weight,omitempty"`
CacheFirst bool `json:"cache_first,omitempty"`
AllowTagsList bool `json:"allow_tags_list,omitempty"`
NoAllowlist bool `json:"no_allowlist,omitempty"`
NoBlock bool `json:"no_block,omitempty"`
NoRateLimit bool `json:"no_rate_limit,omitempty"`
RateLimitPerSecond uint64 `json:"rate_limit_per_second,omitempty"`
Weight int `json:"weight,omitempty"`
CacheFirst bool `json:"cache_first,omitempty"`
ManifestWithQueueSync bool `json:"manifest_with_queue_sync,omitempty"`
AllowTagsList bool `json:"allow_tags_list,omitempty"`
NoAllowlist bool `json:"no_allowlist,omitempty"`
NoBlock bool `json:"no_block,omitempty"`

NoBlobsAgent bool `json:"no_blobs_agent,omitempty"`
BlobsAgentURL string `json:"blobs_url,omitempty"`
Expand Down
2 changes: 2 additions & 0 deletions token/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type Attribute struct {
CacheFirst bool `json:"cache_first,omitempty"`
Weight int `json:"weight,omitempty"`

ManifestWithQueueSync bool `json:"manifest_with_queue_sync,omitempty"`

Host string `json:"host,omitempty"`
Image string `json:"image,omitempty"`

Expand Down

0 comments on commit 8841e7b

Please sign in to comment.