Skip to content

Commit

Permalink
Fix limit
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Feb 11, 2025
1 parent c04a1ad commit 055d29e
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 63 deletions.
98 changes: 41 additions & 57 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ import (

"github.com/daocloud/crproxy/cache"
"github.com/daocloud/crproxy/internal/queue"
"github.com/daocloud/crproxy/internal/throttled"
"github.com/daocloud/crproxy/internal/utils"
"github.com/daocloud/crproxy/queue/client"
"github.com/daocloud/crproxy/queue/model"
"github.com/daocloud/crproxy/token"
"github.com/docker/distribution/registry/api/errcode"
"github.com/google/go-containerregistry/pkg/v1/remote/transport"
"github.com/wzshiming/geario"
"golang.org/x/time/rate"
)

var (
Expand Down Expand Up @@ -60,7 +61,7 @@ type Agent struct {

blobNoRedirectSize int
blobNoRedirectMaxSizePerSecond int
blobNoRedirectBPS *geario.BPS
blobNoRedirectLimit *rate.Limiter

queueClient *client.MessageClient
}
Expand Down Expand Up @@ -113,9 +114,9 @@ func WithBlobNoRedirectSize(blobNoRedirectSize int) Option {
func WithBlobNoRedirectMaxSizePerSecond(blobNoRedirectMaxSizePerSecond int) Option {
return func(c *Agent) error {
if blobNoRedirectMaxSizePerSecond > 0 {
c.blobNoRedirectBPS = geario.NewBPSAver(time.Second)
c.blobNoRedirectLimit = rate.NewLimiter(rate.Limit(blobNoRedirectMaxSizePerSecond), 1024*1024*1024)
} else {
c.blobNoRedirectBPS = nil
c.blobNoRedirectLimit = nil
}
c.blobNoRedirectMaxSizePerSecond = blobNoRedirectMaxSizePerSecond
return nil
Expand Down Expand Up @@ -337,13 +338,12 @@ func (c *Agent) Serve(rw http.ResponseWriter, r *http.Request, info *BlobInfo, t
return
}

c.rateLimit(rw, r, info.Blobs, info, t, value.Size, start)
if value.BigCache {
c.serveBigCachedBlob(rw, r, info.Blobs, info, t, value.Size)
c.serveBigCachedBlob(rw, r, info.Blobs, info, t, value.Size, start)
return
}

c.serveCachedBlob(rw, r, info.Blobs, info, t, value.Size)
c.serveCachedBlob(rw, r, info.Blobs, info, t, value.Size, start)
return
}

Expand All @@ -356,17 +356,15 @@ func (c *Agent) Serve(rw http.ResponseWriter, r *http.Request, info *BlobInfo, t
return
}

c.rateLimit(rw, r, info.Blobs, info, t, stat.Size(), start)
c.serveBigCachedBlob(rw, r, info.Blobs, info, t, stat.Size())
c.serveBigCachedBlob(rw, r, info.Blobs, info, t, stat.Size(), start)
return
}
} else {
if c.serveCachedBlobHead(rw, r, stat.Size()) {
return
}

c.rateLimit(rw, r, info.Blobs, info, t, stat.Size(), start)
c.serveCachedBlob(rw, r, info.Blobs, info, t, stat.Size())
c.serveCachedBlob(rw, r, info.Blobs, info, t, stat.Size(), start)
return
}
} else {
Expand All @@ -377,8 +375,7 @@ func (c *Agent) Serve(rw http.ResponseWriter, r *http.Request, info *BlobInfo, t
return
}

c.rateLimit(rw, r, info.Blobs, info, t, stat.Size(), start)
c.serveBigCachedBlob(rw, r, info.Blobs, info, t, stat.Size())
c.serveBigCachedBlob(rw, r, info.Blobs, info, t, stat.Size(), start)
return
}
}
Expand All @@ -401,13 +398,11 @@ func (c *Agent) Serve(rw http.ResponseWriter, r *http.Request, info *BlobInfo, t
return
}

c.rateLimit(rw, r, info.Blobs, info, t, value.Size, start)

if value.BigCache {
c.serveBigCachedBlob(rw, r, info.Blobs, info, t, value.Size)
c.serveBigCachedBlob(rw, r, info.Blobs, info, t, value.Size, start)
return
}
c.serveCachedBlob(rw, r, info.Blobs, info, t, value.Size)
c.serveCachedBlob(rw, r, info.Blobs, info, t, value.Size, start)
return
}

Expand All @@ -418,8 +413,7 @@ func (c *Agent) Serve(rw http.ResponseWriter, r *http.Request, info *BlobInfo, t
return
}

c.rateLimit(rw, r, info.Blobs, info, t, stat.Size(), start)
c.serveBigCachedBlob(rw, r, info.Blobs, info, t, stat.Size())
c.serveBigCachedBlob(rw, r, info.Blobs, info, t, stat.Size(), start)
return
}
}
Expand All @@ -430,8 +424,7 @@ func (c *Agent) Serve(rw http.ResponseWriter, r *http.Request, info *BlobInfo, t
return
}

c.rateLimit(rw, r, info.Blobs, info, t, stat.Size(), start)
c.serveCachedBlob(rw, r, info.Blobs, info, t, stat.Size())
c.serveCachedBlob(rw, r, info.Blobs, info, t, stat.Size(), start)
return
}

Expand Down Expand Up @@ -561,7 +554,9 @@ func (c *Agent) serveCachedBlobHead(rw http.ResponseWriter, r *http.Request, siz
return false
}

func (c *Agent) serveBigCachedBlob(rw http.ResponseWriter, r *http.Request, blob string, info *BlobInfo, t *token.Token, size int64) {
func (c *Agent) serveBigCachedBlob(rw http.ResponseWriter, r *http.Request, blob string, info *BlobInfo, t *token.Token, size int64, start time.Time) {
c.rateLimit(rw, r, info.Blobs, info, t, size, start)

referer := r.RemoteAddr
if info != nil {
referer = fmt.Sprintf("%d-%d:%s:%s/%s", t.RegistryID, t.TokenID, referer, info.Host, info.Image)
Expand All @@ -582,39 +577,40 @@ func (c *Agent) serveBigCachedBlob(rw http.ResponseWriter, r *http.Request, blob
return
}

func (c *Agent) serveCachedBlob(rw http.ResponseWriter, r *http.Request, blob string, info *BlobInfo, t *token.Token, size int64) {

func (c *Agent) serveCachedBlob(rw http.ResponseWriter, r *http.Request, blob string, info *BlobInfo, t *token.Token, size int64, start time.Time) {
if c.blobNoRedirectSize < 0 || int64(c.blobNoRedirectSize) > size {
data, err := c.cache.GetBlob(r.Context(), info.Blobs)
if err != nil {
c.logger.Info("failed to get blob", "digest", blob, "error", err)
c.blobCache.Remove(info.Blobs)
utils.ServeError(rw, r, errcode.ErrorCodeUnknown, 0)
return
}
defer data.Close()
if c.blobNoRedirectLimit == nil || c.blobNoRedirectLimit.Reserve().Delay() == 0 {
data, err := c.cache.GetBlob(r.Context(), info.Blobs)
if err != nil {
c.logger.Info("failed to get blob", "digest", blob, "error", err)
c.blobCache.Remove(info.Blobs)
utils.ServeError(rw, r, errcode.ErrorCodeUnknown, 0)
return
}
defer data.Close()

c.blobCache.Put(info.Blobs, size, false)
c.blobCache.Put(info.Blobs, size, false)

rw.Header().Set("Content-Length", strconv.FormatInt(size, 10))
rw.Header().Set("Content-Type", "application/octet-stream")
rw.Header().Set("Content-Length", strconv.FormatInt(size, 10))
rw.Header().Set("Content-Type", "application/octet-stream")

if c.blobNoRedirectBPS == nil {
io.Copy(rw, data)
return
}
var body io.Reader = data
if t.RateLimitPerSecond > 0 {
limit := rate.NewLimiter(rate.Limit(t.RateLimitPerSecond), 1024*1024)
body = throttled.NewThrottledReader(r.Context(), body, limit)
}

if int(c.blobNoRedirectBPS.Aver()) < c.blobNoRedirectMaxSizePerSecond {
r := &readerCounter{
r: data,
counter: c.blobNoRedirectBPS,
if c.blobNoRedirectLimit != nil {
body = throttled.NewThrottledReader(r.Context(), body, c.blobNoRedirectLimit)
}
io.Copy(rw, r)

io.Copy(rw, body)
return
}
// fallback to redirect
}

c.rateLimit(rw, r, info.Blobs, info, t, size, start)

referer := r.RemoteAddr
if info != nil {
referer = fmt.Sprintf("%d-%d:%s:%s/%s", t.RegistryID, t.TokenID, referer, info.Host, info.Image)
Expand Down Expand Up @@ -685,15 +681,3 @@ func (c *Agent) waitingQueue(ctx context.Context, msg string, weight int, info *
return client.MessageResponse{}, fmt.Errorf("unexpected status %d for message %q", mr.Status, msg)
}
}

type readerCounter struct {
r io.Reader
counter *geario.BPS
}

func (r *readerCounter) Read(b []byte) (int, error) {
n, err := r.r.Read(b)

r.counter.Add(geario.B(len(b)))
return n, err
}
6 changes: 4 additions & 2 deletions gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ import (
"github.com/daocloud/crproxy/agent"
"github.com/daocloud/crproxy/cache"
"github.com/daocloud/crproxy/internal/queue"
"github.com/daocloud/crproxy/internal/throttled"
"github.com/daocloud/crproxy/internal/utils"
"github.com/daocloud/crproxy/queue/client"
"github.com/daocloud/crproxy/token"
"github.com/docker/distribution/registry/api/errcode"
"github.com/wzshiming/geario"
"golang.org/x/time/rate"
)

var (
Expand Down Expand Up @@ -351,7 +352,8 @@ func (c *Gateway) forward(rw http.ResponseWriter, r *http.Request, info *PathInf
var body io.Reader = resp.Body

if t.RateLimitPerSecond > 0 {
body = geario.NewGear(time.Second, geario.B(t.RateLimitPerSecond)).Reader(body)
limit := rate.NewLimiter(rate.Limit(t.RateLimitPerSecond), 1024*1024)
body = throttled.NewThrottledReader(r.Context(), body, limit)
}

io.Copy(rw, body)
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/daocloud/crproxy
go 1.23.4

require (
github.com/aws/aws-sdk-go v1.48.10
github.com/denverdino/aliyungo v0.0.0
github.com/distribution/reference v0.6.0
github.com/docker/distribution v2.8.2+incompatible
Expand All @@ -16,11 +17,11 @@ require (
github.com/opencontainers/go-digest v1.0.0
github.com/spf13/cobra v1.8.1
github.com/wzshiming/cmux v0.4.2
github.com/wzshiming/geario v0.0.0-20240308093553-a996e3817533
github.com/wzshiming/hostmatcher v0.0.3
github.com/wzshiming/httpseek v0.1.0
github.com/wzshiming/imc v0.0.0-20250106051804-1cb884b5184a
golang.org/x/crypto v0.28.0
golang.org/x/time v0.10.0
)

replace (
Expand All @@ -30,7 +31,6 @@ replace (

require (
filippo.io/edwards25519 v1.1.0 // indirect
github.com/aws/aws-sdk-go v1.48.10 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,6 @@ github.com/wzshiming/aliyungo v0.0.0-20241126040137-4b8c22b50cd3 h1:MwH8lliQekyh
github.com/wzshiming/aliyungo v0.0.0-20241126040137-4b8c22b50cd3/go.mod h1:TK05uvk4XXfK2kdvRwfcZ1NaxjDxmm7H3aQLko0mJxA=
github.com/wzshiming/cmux v0.4.2 h1:tI73lL5ztVfiqw7R5m5BkxT1+vQ2PBo/oV6qPbNGPiA=
github.com/wzshiming/cmux v0.4.2/go.mod h1:JgE61QfZAjEyNMX0iZo9zIKY6pr9bHVY132yYPwHW5U=
github.com/wzshiming/geario v0.0.0-20240308093553-a996e3817533 h1:mq74wxgDCz7Q6CqZYExt0DHf7Ze28lyMW/TNsfcuk8M=
github.com/wzshiming/geario v0.0.0-20240308093553-a996e3817533/go.mod h1:Fodw3HJvNUS+/MgqXCRp9iYLQfynAu/LKXGOWoX+D/Q=
github.com/wzshiming/hostmatcher v0.0.3 h1:+JYAq6vUZXDEQ1Ipfdc/D7HmaIMngcc71ftonyCQVQk=
github.com/wzshiming/hostmatcher v0.0.3/go.mod h1:F04RIvIWEvOIrIKOlQlMuR8vQMKAVf2YhpU6l31Wwz4=
github.com/wzshiming/httpseek v0.1.0 h1:lEgL7EBELT/VV9UaTp+m3kw5Pe1KOUdY+IPnKkag6tI=
Expand Down Expand Up @@ -198,6 +196,10 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs=
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.10.0 h1:3usCWA8tQn0L8+hFJQNgzpWbd89begxN66o1Ojdn5L4=
golang.org/x/time v0.10.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
Expand Down
35 changes: 35 additions & 0 deletions internal/throttled/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package throttled

import (
"context"
"io"

"golang.org/x/time/rate"
)

type throttledReader struct {
r io.Reader
limiter *rate.Limiter
ctx context.Context
}

func NewThrottledReader(ctx context.Context, r io.Reader, limiter *rate.Limiter) io.Reader {
return &throttledReader{
r: r,
limiter: limiter,
ctx: ctx,
}
}

func (r *throttledReader) Read(p []byte) (n int, err error) {
n, err = r.r.Read(p)
if err != nil {
return n, err
}

if err := r.limiter.WaitN(r.ctx, n); err != nil {
return n, err
}

return n, nil
}

0 comments on commit 055d29e

Please sign in to comment.