Skip to content

Commit

Permalink
Fix limit no redirect
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Feb 11, 2025
1 parent a15c653 commit bc6dd78
Showing 1 changed file with 26 additions and 46 deletions.
72 changes: 26 additions & 46 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type Agent struct {

blobNoRedirectSize int
blobNoRedirectMaxSizePerSecond int
blobNoRedirectBPS *geario.BPS
blobNoRedirectGear *geario.Gear

queueClient *client.MessageClient
}
Expand Down Expand Up @@ -113,9 +113,9 @@ func WithBlobNoRedirectSize(blobNoRedirectSize int) Option {
func WithBlobNoRedirectMaxSizePerSecond(blobNoRedirectMaxSizePerSecond int) Option {
return func(c *Agent) error {
if blobNoRedirectMaxSizePerSecond > 0 {
c.blobNoRedirectBPS = geario.NewBPSAver(time.Second)
c.blobNoRedirectGear = geario.NewGear(time.Second, geario.B(blobNoRedirectMaxSizePerSecond))
} else {
c.blobNoRedirectBPS = nil
c.blobNoRedirectGear = nil
}
c.blobNoRedirectMaxSizePerSecond = blobNoRedirectMaxSizePerSecond
return nil
Expand Down Expand Up @@ -337,13 +337,12 @@ func (c *Agent) Serve(rw http.ResponseWriter, r *http.Request, info *BlobInfo, t
return
}

c.rateLimit(rw, r, info.Blobs, info, t, value.Size, start)
if value.BigCache {
c.serveBigCachedBlob(rw, r, info.Blobs, info, t, value.Size)
c.serveBigCachedBlob(rw, r, info.Blobs, info, t, value.Size, start)
return
}

c.serveCachedBlob(rw, r, info.Blobs, info, t, value.Size)
c.serveCachedBlob(rw, r, info.Blobs, info, t, value.Size, start)
return
}

Expand All @@ -356,17 +355,15 @@ func (c *Agent) Serve(rw http.ResponseWriter, r *http.Request, info *BlobInfo, t
return
}

c.rateLimit(rw, r, info.Blobs, info, t, stat.Size(), start)
c.serveBigCachedBlob(rw, r, info.Blobs, info, t, stat.Size())
c.serveBigCachedBlob(rw, r, info.Blobs, info, t, stat.Size(), start)
return
}
} else {
if c.serveCachedBlobHead(rw, r, stat.Size()) {
return
}

c.rateLimit(rw, r, info.Blobs, info, t, stat.Size(), start)
c.serveCachedBlob(rw, r, info.Blobs, info, t, stat.Size())
c.serveCachedBlob(rw, r, info.Blobs, info, t, stat.Size(), start)
return
}
} else {
Expand All @@ -377,8 +374,7 @@ func (c *Agent) Serve(rw http.ResponseWriter, r *http.Request, info *BlobInfo, t
return
}

c.rateLimit(rw, r, info.Blobs, info, t, stat.Size(), start)
c.serveBigCachedBlob(rw, r, info.Blobs, info, t, stat.Size())
c.serveBigCachedBlob(rw, r, info.Blobs, info, t, stat.Size(), start)
return
}
}
Expand All @@ -401,13 +397,11 @@ func (c *Agent) Serve(rw http.ResponseWriter, r *http.Request, info *BlobInfo, t
return
}

c.rateLimit(rw, r, info.Blobs, info, t, value.Size, start)

if value.BigCache {
c.serveBigCachedBlob(rw, r, info.Blobs, info, t, value.Size)
c.serveBigCachedBlob(rw, r, info.Blobs, info, t, value.Size, start)
return
}
c.serveCachedBlob(rw, r, info.Blobs, info, t, value.Size)
c.serveCachedBlob(rw, r, info.Blobs, info, t, value.Size, start)
return
}

Expand All @@ -418,8 +412,7 @@ func (c *Agent) Serve(rw http.ResponseWriter, r *http.Request, info *BlobInfo, t
return
}

c.rateLimit(rw, r, info.Blobs, info, t, stat.Size(), start)
c.serveBigCachedBlob(rw, r, info.Blobs, info, t, stat.Size())
c.serveBigCachedBlob(rw, r, info.Blobs, info, t, stat.Size(), start)
return
}
}
Expand All @@ -430,8 +423,7 @@ func (c *Agent) Serve(rw http.ResponseWriter, r *http.Request, info *BlobInfo, t
return
}

c.rateLimit(rw, r, info.Blobs, info, t, stat.Size(), start)
c.serveCachedBlob(rw, r, info.Blobs, info, t, stat.Size())
c.serveCachedBlob(rw, r, info.Blobs, info, t, stat.Size(), start)
return
}

Expand Down Expand Up @@ -561,7 +553,9 @@ func (c *Agent) serveCachedBlobHead(rw http.ResponseWriter, r *http.Request, siz
return false
}

func (c *Agent) serveBigCachedBlob(rw http.ResponseWriter, r *http.Request, blob string, info *BlobInfo, t *token.Token, size int64) {
func (c *Agent) serveBigCachedBlob(rw http.ResponseWriter, r *http.Request, blob string, info *BlobInfo, t *token.Token, size int64, start time.Time) {
c.rateLimit(rw, r, info.Blobs, info, t, size, start)

referer := r.RemoteAddr
if info != nil {
referer = fmt.Sprintf("%d-%d:%s:%s/%s", t.RegistryID, t.TokenID, referer, info.Host, info.Image)
Expand All @@ -582,8 +576,7 @@ func (c *Agent) serveBigCachedBlob(rw http.ResponseWriter, r *http.Request, blob
return
}

func (c *Agent) serveCachedBlob(rw http.ResponseWriter, r *http.Request, blob string, info *BlobInfo, t *token.Token, size int64) {

func (c *Agent) serveCachedBlob(rw http.ResponseWriter, r *http.Request, blob string, info *BlobInfo, t *token.Token, size int64, start time.Time) {
if c.blobNoRedirectSize < 0 || int64(c.blobNoRedirectSize) > size {
data, err := c.cache.GetBlob(r.Context(), info.Blobs)
if err != nil {
Expand All @@ -599,22 +592,21 @@ func (c *Agent) serveCachedBlob(rw http.ResponseWriter, r *http.Request, blob st
rw.Header().Set("Content-Length", strconv.FormatInt(size, 10))
rw.Header().Set("Content-Type", "application/octet-stream")

if c.blobNoRedirectBPS == nil {
io.Copy(rw, data)
return
var body io.Reader = data
if t.RateLimitPerSecond > 0 {
body = geario.NewGear(time.Second, geario.B(t.RateLimitPerSecond)).Reader(body)
}

if int(c.blobNoRedirectBPS.Aver()) < c.blobNoRedirectMaxSizePerSecond {
r := &readerCounter{
r: data,
counter: c.blobNoRedirectBPS,
}
io.Copy(rw, r)
return
if c.blobNoRedirectGear != nil && int(c.blobNoRedirectGear.Aver()) < c.blobNoRedirectMaxSizePerSecond/10*9 {
body = c.blobNoRedirectGear.Reader(body)
}
// fallback to redirect

io.Copy(rw, body)
return
}

c.rateLimit(rw, r, info.Blobs, info, t, size, start)

referer := r.RemoteAddr
if info != nil {
referer = fmt.Sprintf("%d-%d:%s:%s/%s", t.RegistryID, t.TokenID, referer, info.Host, info.Image)
Expand Down Expand Up @@ -685,15 +677,3 @@ func (c *Agent) waitingQueue(ctx context.Context, msg string, weight int, info *
return client.MessageResponse{}, fmt.Errorf("unexpected status %d for message %q", mr.Status, msg)
}
}

type readerCounter struct {
r io.Reader
counter *geario.BPS
}

func (r *readerCounter) Read(b []byte) (int, error) {
n, err := r.r.Read(b)

r.counter.Add(geario.B(len(b)))
return n, err
}

0 comments on commit bc6dd78

Please sign in to comment.