Skip to content

Commit

Permalink
Support range header for agent
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Feb 13, 2025
1 parent e000404 commit 29ddd81
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 37 deletions.
96 changes: 61 additions & 35 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/daocloud/crproxy/cache"
"github.com/daocloud/crproxy/internal/queue"
"github.com/daocloud/crproxy/internal/seeker"
"github.com/daocloud/crproxy/internal/throttled"
"github.com/daocloud/crproxy/internal/utils"
"github.com/daocloud/crproxy/queue/client"
Expand Down Expand Up @@ -339,11 +340,11 @@ func (c *Agent) Serve(rw http.ResponseWriter, r *http.Request, info *BlobInfo, t
}

if value.BigCache {
c.serveBigCachedBlob(rw, r, info.Blobs, info, t, value.Size, start)
c.serveBigCachedBlob(rw, r, info.Blobs, info, t, value.ModTime, value.Size, start)
return
}

c.serveCachedBlob(rw, r, info.Blobs, info, t, value.Size, start)
c.serveCachedBlob(rw, r, info.Blobs, info, t, value.ModTime, value.Size, start)
return
}

Expand All @@ -356,15 +357,15 @@ func (c *Agent) Serve(rw http.ResponseWriter, r *http.Request, info *BlobInfo, t
return
}

c.serveBigCachedBlob(rw, r, info.Blobs, info, t, stat.Size(), start)
c.serveBigCachedBlob(rw, r, info.Blobs, info, t, stat.ModTime(), stat.Size(), start)
return
}
} else {
if c.serveCachedBlobHead(rw, r, stat.Size()) {
return
}

c.serveCachedBlob(rw, r, info.Blobs, info, t, stat.Size(), start)
c.serveCachedBlob(rw, r, info.Blobs, info, t, stat.ModTime(), stat.Size(), start)
return
}
} else {
Expand All @@ -375,7 +376,7 @@ func (c *Agent) Serve(rw http.ResponseWriter, r *http.Request, info *BlobInfo, t
return
}

c.serveBigCachedBlob(rw, r, info.Blobs, info, t, stat.Size(), start)
c.serveBigCachedBlob(rw, r, info.Blobs, info, t, stat.ModTime(), stat.Size(), start)
return
}
}
Expand All @@ -399,10 +400,10 @@ func (c *Agent) Serve(rw http.ResponseWriter, r *http.Request, info *BlobInfo, t
}

if value.BigCache {
c.serveBigCachedBlob(rw, r, info.Blobs, info, t, value.Size, start)
c.serveBigCachedBlob(rw, r, info.Blobs, info, t, value.ModTime, value.Size, start)
return
}
c.serveCachedBlob(rw, r, info.Blobs, info, t, value.Size, start)
c.serveCachedBlob(rw, r, info.Blobs, info, t, value.ModTime, value.Size, start)
return
}

Expand All @@ -413,7 +414,7 @@ func (c *Agent) Serve(rw http.ResponseWriter, r *http.Request, info *BlobInfo, t
return
}

c.serveBigCachedBlob(rw, r, info.Blobs, info, t, stat.Size(), start)
c.serveBigCachedBlob(rw, r, info.Blobs, info, t, stat.ModTime(), stat.Size(), start)
return
}
}
Expand All @@ -424,7 +425,7 @@ func (c *Agent) Serve(rw http.ResponseWriter, r *http.Request, info *BlobInfo, t
return
}

c.serveCachedBlob(rw, r, info.Blobs, info, t, stat.Size(), start)
c.serveCachedBlob(rw, r, info.Blobs, info, t, stat.ModTime(), stat.Size(), start)
return
}

Expand Down Expand Up @@ -521,15 +522,33 @@ func (c *Agent) cacheBlob(info *BlobInfo) (int64, func() error, int, error) {
if err != nil {
return fmt.Errorf("Put to big cache: %w", err)
}
c.blobCache.PutNoTTL(info.Blobs, size, true)

stat, err := c.cache.StatBlob(ctx, info.Blobs)
if err != nil {
return err
}

if size != stat.Size() {
return fmt.Errorf("size mismatch: expected %d, got %d", stat.Size(), size)
}
c.blobCache.PutNoTTL(info.Blobs, stat.ModTime(), size, true)
return nil
}

size, err := c.cache.PutBlob(ctx, info.Blobs, resp.Body)
if err != nil {
return err
}
c.blobCache.Put(info.Blobs, size, false)

stat, err := c.cache.StatBlob(ctx, info.Blobs)
if err != nil {
return err
}

if size != stat.Size() {
return fmt.Errorf("size mismatch: expected %d, got %d", stat.Size(), size)
}
c.blobCache.Put(info.Blobs, stat.ModTime(), size, false)
return nil
}

Expand All @@ -554,7 +573,7 @@ func (c *Agent) serveCachedBlobHead(rw http.ResponseWriter, r *http.Request, siz
return false
}

func (c *Agent) serveBigCachedBlob(rw http.ResponseWriter, r *http.Request, blob string, info *BlobInfo, t *token.Token, size int64, start time.Time) {
func (c *Agent) serveBigCachedBlob(rw http.ResponseWriter, r *http.Request, blob string, info *BlobInfo, t *token.Token, modTime time.Time, size int64, start time.Time) {
c.rateLimit(rw, r, info.Blobs, info, t, size, start)

referer := r.RemoteAddr
Expand All @@ -570,41 +589,48 @@ func (c *Agent) serveBigCachedBlob(rw http.ResponseWriter, r *http.Request, blob
return
}

c.blobCache.PutNoTTL(info.Blobs, size, true)
c.blobCache.PutNoTTL(info.Blobs, modTime, size, true)

c.logger.Info("Big Cache hit", "digest", blob, "url", u)
http.Redirect(rw, r, u, http.StatusTemporaryRedirect)
return
}

func (c *Agent) serveCachedBlob(rw http.ResponseWriter, r *http.Request, blob string, info *BlobInfo, t *token.Token, size int64, start time.Time) {
func (c *Agent) serveCachedBlob(rw http.ResponseWriter, r *http.Request, blob string, info *BlobInfo, t *token.Token, modTime time.Time, size int64, start time.Time) {
if c.blobNoRedirectSize < 0 || int64(c.blobNoRedirectSize) > size {
if c.blobNoRedirectLimit == nil || c.blobNoRedirectLimit.Reserve().Delay() == 0 {
data, err := c.cache.GetBlob(r.Context(), info.Blobs)
if err != nil {
c.logger.Info("failed to get blob", "digest", blob, "error", err)
c.blobCache.Remove(info.Blobs)
utils.ServeError(rw, r, errcode.ErrorCodeUnknown, 0)
return
}
defer data.Close()
rw.Header().Set("Content-Type", "application/octet-stream")

c.blobCache.Put(info.Blobs, size, false)
rs := seeker.NewReadSeekCloser(func(start int64) (io.ReadCloser, error) {
data, err := c.cache.GetBlobWithOffset(r.Context(), info.Blobs, start)
if err != nil {
return nil, err
}

rw.Header().Set("Content-Length", strconv.FormatInt(size, 10))
rw.Header().Set("Content-Type", "application/octet-stream")
var body io.Reader = data
if t.RateLimitPerSecond > 0 {
limit := rate.NewLimiter(rate.Limit(t.RateLimitPerSecond), 1024*1024)
body = throttled.NewThrottledReader(r.Context(), body, limit)
}

var body io.Reader = data
if t.RateLimitPerSecond > 0 {
limit := rate.NewLimiter(rate.Limit(t.RateLimitPerSecond), 1024*1024)
body = throttled.NewThrottledReader(r.Context(), body, limit)
}
if c.blobNoRedirectLimit != nil {
body = throttled.NewThrottledReader(r.Context(), body, c.blobNoRedirectLimit)
}

if c.blobNoRedirectLimit != nil {
body = throttled.NewThrottledReader(r.Context(), body, c.blobNoRedirectLimit)
}
return struct {
io.Reader
io.Closer
}{
Reader: body,
Closer: data,
}, nil
}, size)
defer rs.Close()

http.ServeContent(rw, r, "", modTime, rs)

c.blobCache.Put(info.Blobs, modTime, size, false)

io.Copy(rw, body)
return
}
}
Expand All @@ -624,7 +650,7 @@ func (c *Agent) serveCachedBlob(rw http.ResponseWriter, r *http.Request, blob st
return
}

c.blobCache.Put(info.Blobs, size, false)
c.blobCache.Put(info.Blobs, modTime, size, false)

c.logger.Info("Cache hit", "digest", blob, "url", u)
http.Redirect(rw, r, u, http.StatusTemporaryRedirect)
Expand Down
7 changes: 5 additions & 2 deletions agent/blob_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,25 @@ func (m *blobCache) PutError(key string, err error, sc int) {
}, m.duration)
}

func (m *blobCache) Put(key string, size int64, bigCache bool) {
func (m *blobCache) Put(key string, modTime time.Time, size int64, bigCache bool) {
m.digest.SetWithTTL(key, blobValue{
Size: size,
ModTime: modTime,
BigCache: bigCache,
}, m.duration)
}

func (m *blobCache) PutNoTTL(key string, size int64, bigCache bool) {
func (m *blobCache) PutNoTTL(key string, modTime time.Time, size int64, bigCache bool) {
m.digest.Set(key, blobValue{
Size: size,
ModTime: modTime,
BigCache: bigCache,
})
}

type blobValue struct {
Size int64
ModTime time.Time
BigCache bool
Error error
StatusCode int
Expand Down
4 changes: 4 additions & 0 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ func (c *Cache) Get(ctx context.Context, cachePath string) (io.ReadCloser, error
return c.storageDriver.Reader(ctx, cachePath, 0)
}

func (c *Cache) GetWithOffset(ctx context.Context, cachePath string, offset int64) (io.ReadCloser, error) {
return c.storageDriver.Reader(ctx, cachePath, offset)
}

func (c *Cache) GetContent(ctx context.Context, cachePath string) ([]byte, error) {
return c.storageDriver.GetContent(ctx, cachePath)
}
Expand Down
5 changes: 5 additions & 0 deletions cache/cache_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ func (c *Cache) GetBlob(ctx context.Context, blob string) (io.ReadCloser, error)
return c.Get(ctx, cachePath)
}

func (c *Cache) GetBlobWithOffset(ctx context.Context, blob string, offset int64) (io.ReadCloser, error) {
cachePath := blobCachePath(blob)
return c.GetWithOffset(ctx, cachePath, offset)
}

func (c *Cache) DeleteBlob(ctx context.Context, blob string) error {
cachePath := blobCachePath(blob)
return c.Delete(ctx, cachePath)
Expand Down
89 changes: 89 additions & 0 deletions internal/seeker/seeker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package seeker

import (
"fmt"
"io"
)

type readSeekCloser struct {
getReader func(start int64) (io.ReadCloser, error)
size int64
offset int64
current io.ReadCloser
}

func NewReadSeekCloser(getReadCloser func(int64) (io.ReadCloser, error), size int64) io.ReadSeekCloser {
return &readSeekCloser{
getReader: getReadCloser,
size: size,
offset: 0,
current: nil,
}
}

func (rs *readSeekCloser) Read(p []byte) (int, error) {
if rs.offset >= rs.size {
return 0, io.EOF
}

if rs.current == nil {
reader, err := rs.getReader(rs.offset)
if err != nil {
return 0, err
}
rs.current = reader
}

n, err := rs.current.Read(p)
rs.offset += int64(n)

if err == io.EOF {
rs.current.Close()
rs.current = nil
if rs.offset >= rs.size {
err = io.EOF
} else {
err = nil
}
}

return n, err
}

func (rs *readSeekCloser) Seek(offset int64, whence int) (int64, error) {
var newOffset int64
switch whence {
case io.SeekStart:
newOffset = offset
case io.SeekCurrent:
newOffset = rs.offset + offset
case io.SeekEnd:
newOffset = rs.size + offset
default:
return 0, fmt.Errorf("seek: invalid whence")
}

if newOffset < 0 {
return 0, fmt.Errorf("seek: negative position")
}
if newOffset > rs.size {
newOffset = rs.size
}

if rs.current != nil && newOffset != rs.offset {
rs.current.Close()
rs.current = nil
}

rs.offset = newOffset
return newOffset, nil
}

func (rs *readSeekCloser) Close() error {
if rs.current != nil {
err := rs.current.Close()
rs.current = nil
return err
}
return nil
}

0 comments on commit 29ddd81

Please sign in to comment.