Skip to content

Commit

Permalink
Refactor the storage
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Feb 17, 2025
1 parent 11cd817 commit 84ffcae
Show file tree
Hide file tree
Showing 19 changed files with 56 additions and 3,730 deletions.
9 changes: 5 additions & 4 deletions cache/blob_writer.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
package cache

import (
"context"
"encoding/hex"
"fmt"
"hash"

storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/wzshiming/sss"
)

type blobWriter struct {
storagedriver.FileWriter
sss.FileWriter
cacheHash string
h hash.Hash
}

func (bw *blobWriter) Commit() error {
func (bw *blobWriter) Commit(ctx context.Context) error {
hash := hex.EncodeToString(bw.h.Sum(nil)[:])
if bw.cacheHash != hash {
return fmt.Errorf("expected %s hash, got %s", bw.cacheHash, hash)
}
return bw.FileWriter.Commit()
return bw.FileWriter.Commit(ctx)
}

func (bw *blobWriter) Write(p []byte) (int, error) {
Expand Down
54 changes: 26 additions & 28 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,18 @@ import (
"fmt"
"io"
"io/fs"
"net/http"
"net/url"
"path"
"strings"
"sync"
"time"

storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/wzshiming/sss"
)

type Cache struct {
bytesPool sync.Pool
storageDriver storagedriver.StorageDriver
storageDriver *sss.SSS
linkExpires time.Duration
signLink bool
redirectLinks *url.URL
Expand All @@ -39,7 +38,7 @@ func WithRedirectLinks(l *url.URL) Option {
}
}

func WithStorageDriver(storageDriver storagedriver.StorageDriver) Option {
func WithStorageDriver(storageDriver *sss.SSS) Option {
return func(c *Cache) {
c.storageDriver = storageDriver
}
Expand Down Expand Up @@ -76,19 +75,9 @@ func (c *Cache) Redirect(ctx context.Context, blobPath string, referer string) (
return u.String(), nil
}

options := map[string]interface{}{
"method": http.MethodGet,
}

linkExpires := c.linkExpires
if linkExpires > 0 {
options["expiry"] = time.Now().Add(linkExpires)
}

if referer != "" {
options["referer"] = referer
}
u, err := c.storageDriver.URLFor(ctx, blobPath, options)
u, err := c.storageDriver.SignGet(blobPath, linkExpires)
if err != nil {
return "", err
}
Expand All @@ -104,11 +93,14 @@ func (c *Cache) Redirect(ctx context.Context, blobPath string, referer string) (
return u, nil
}

func (c *Cache) Writer(ctx context.Context, cachePath string, append bool) (storagedriver.FileWriter, error) {
return c.storageDriver.Writer(ctx, cachePath, append)
func (c *Cache) Writer(ctx context.Context, cachePath string, append bool) (sss.FileWriter, error) {
if append {
return c.storageDriver.WriterWithAppend(ctx, cachePath)
}
return c.storageDriver.Writer(ctx, cachePath)
}

func (c *Cache) BlobWriter(ctx context.Context, blob string, append bool) (storagedriver.FileWriter, error) {
func (c *Cache) BlobWriter(ctx context.Context, blob string, append bool) (sss.FileWriter, error) {
cachePath := blobCachePath(blob)

if append {
Expand All @@ -127,7 +119,7 @@ func (c *Cache) BlobWriter(ctx context.Context, blob string, append bool) (stora
}

func (c *Cache) put(ctx context.Context, cachePath string, r io.Reader, checkFunc func(int64) error) (int64, error) {
fw, err := c.storageDriver.Writer(ctx, cachePath, false)
fw, err := c.storageDriver.Writer(ctx, cachePath)
if err != nil {
return 0, err
}
Expand All @@ -137,19 +129,17 @@ func (c *Cache) put(ctx context.Context, cachePath string, r io.Reader, checkFun

n, err := io.CopyBuffer(fw, r, buf)
if err != nil {
fw.Cancel()
return 0, err
}

if checkFunc != nil {
err = checkFunc(n)
if err != nil {
fw.Cancel()
return 0, err
}
}

err = fw.Commit()
err = fw.Commit(ctx)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -183,23 +173,23 @@ func (c *Cache) Delete(ctx context.Context, cachePath string) error {
}

func (c *Cache) Get(ctx context.Context, cachePath string) (io.ReadCloser, error) {
return c.storageDriver.Reader(ctx, cachePath, 0)
return c.storageDriver.Reader(ctx, cachePath)
}

func (c *Cache) GetWithOffset(ctx context.Context, cachePath string, offset int64) (io.ReadCloser, error) {
return c.storageDriver.Reader(ctx, cachePath, offset)
return c.storageDriver.ReaderWithOffset(ctx, cachePath, offset)
}

func (c *Cache) GetContent(ctx context.Context, cachePath string) ([]byte, error) {
return c.storageDriver.GetContent(ctx, cachePath)
}

func (c *Cache) Stat(ctx context.Context, cachePath string) (storagedriver.FileInfo, error) {
func (c *Cache) Stat(ctx context.Context, cachePath string) (sss.FileInfo, error) {
return c.storageDriver.Stat(ctx, cachePath)
}

func (c *Cache) Walk(ctx context.Context, cachePath string, fun fs.WalkDirFunc) error {
return c.storageDriver.Walk(ctx, cachePath, func(fi storagedriver.FileInfo) error {
return c.storageDriver.Walk(ctx, cachePath, func(fi sss.FileInfo) error {
p := fi.Path()
fiw := fileInfoWrap{
name: path.Base(p),
Expand All @@ -211,12 +201,20 @@ func (c *Cache) Walk(ctx context.Context, cachePath string, fun fs.WalkDirFunc)
}

func (c *Cache) List(ctx context.Context, cachePath string) ([]string, error) {
return c.storageDriver.List(ctx, cachePath)
list := []string{}
err := c.storageDriver.List(ctx, cachePath, func(fileInfo sss.FileInfo) bool {
list = append(list, fileInfo.Path())
return true
})
if err != nil {
return nil, err
}
return list, nil
}

type fileInfoWrap struct {
name string
storagedriver.FileInfo
sss.FileInfo
}

var _ fs.DirEntry = (*fileInfoWrap)(nil)
Expand Down
4 changes: 2 additions & 2 deletions cache/cache_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ import (
"path"
"strings"

storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/wzshiming/sss"
)

func (c *Cache) RedirectBlob(ctx context.Context, blob string, referer string) (string, error) {
return c.Redirect(ctx, blobCachePath(blob), referer)
}

func (c *Cache) StatBlob(ctx context.Context, blob string) (storagedriver.FileInfo, error) {
func (c *Cache) StatBlob(ctx context.Context, blob string) (sss.FileInfo, error) {
return c.Stat(ctx, blobCachePath(blob))
}

Expand Down
6 changes: 3 additions & 3 deletions cmd/crproxy/cluster/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import (
"github.com/daocloud/crproxy/internal/server"
"github.com/daocloud/crproxy/queue/client"
"github.com/daocloud/crproxy/signing"
"github.com/daocloud/crproxy/storage"
"github.com/daocloud/crproxy/token"
"github.com/daocloud/crproxy/transport"
"github.com/gorilla/handlers"
"github.com/spf13/cobra"
"github.com/wzshiming/httpseek"
"github.com/wzshiming/sss"
)

type flagpole struct {
Expand Down Expand Up @@ -119,7 +119,7 @@ func runE(ctx context.Context, flags *flagpole) error {
cache.WithSignLink(flags.SignLink),
}

sd, err := storage.NewStorage(flags.StorageURL)
sd, err := sss.NewSSS(sss.WithURL(flags.StorageURL))
if err != nil {
return fmt.Errorf("create storage driver failed: %w", err)
}
Expand Down Expand Up @@ -153,7 +153,7 @@ func runE(ctx context.Context, flags *flagpole) error {

if flags.BigStorageURL != "" && flags.BigStorageSize > 0 {
bigCacheOpts := []cache.Option{}
sd, err := storage.NewStorage(flags.BigStorageURL)
sd, err := sss.NewSSS(sss.WithURL(flags.BigStorageURL))
if err != nil {
return fmt.Errorf("create storage driver failed: %w", err)
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/crproxy/cluster/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ import (
"github.com/daocloud/crproxy/internal/utils"
"github.com/daocloud/crproxy/queue/client"
"github.com/daocloud/crproxy/signing"
"github.com/daocloud/crproxy/storage"
"github.com/daocloud/crproxy/token"
"github.com/daocloud/crproxy/transport"
"github.com/gorilla/handlers"
"github.com/spf13/cobra"
"github.com/wzshiming/httpseek"
"github.com/wzshiming/sss"
)

type flagpole struct {
Expand Down Expand Up @@ -173,7 +173,7 @@ func runE(ctx context.Context, flags *flagpole) error {
cache.WithSignLink(flags.SignLink),
}

sd, err := storage.NewStorage(flags.StorageURL)
sd, err := sss.NewSSS(sss.WithURL(flags.StorageURL))
if err != nil {
return fmt.Errorf("create storage driver failed: %w", err)
}
Expand Down Expand Up @@ -206,7 +206,7 @@ func runE(ctx context.Context, flags *flagpole) error {

if flags.BigStorageURL != "" && flags.BigStorageSize > 0 {
bigCacheOpts := []cache.Option{}
sd, err := storage.NewStorage(flags.BigStorageURL)
sd, err := sss.NewSSS(sss.WithURL(flags.BigStorageURL))
if err != nil {
return fmt.Errorf("create storage driver failed: %w", err)
}
Expand Down
8 changes: 4 additions & 4 deletions cmd/crproxy/cluster/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import (
"github.com/daocloud/crproxy/internal/spec"
"github.com/daocloud/crproxy/queue/client"
"github.com/daocloud/crproxy/runner"
"github.com/daocloud/crproxy/storage"
"github.com/daocloud/crproxy/transport"
"github.com/spf13/cobra"
"github.com/wzshiming/httpseek"
"github.com/wzshiming/sss"
)

type flagpole struct {
Expand Down Expand Up @@ -86,7 +86,7 @@ func runE(ctx context.Context, flags *flagpole) error {

var caches []*cache.Cache
for _, s := range flags.StorageURL {
sd, err := storage.NewStorage(s)
sd, err := sss.NewSSS(sss.WithURL(s))
if err != nil {
return fmt.Errorf("create storage driver failed: %w", err)
}
Expand Down Expand Up @@ -171,7 +171,7 @@ func runE(ctx context.Context, flags *flagpole) error {

if flags.BigStorageURL != "" && flags.BigStorageSize > 0 {
bigCacheOpts := []cache.Option{}
sd, err := storage.NewStorage(flags.BigStorageURL)
sd, err := sss.NewSSS(sss.WithURL(flags.BigStorageURL))
if err != nil {
return fmt.Errorf("create storage driver failed: %w", err)
}
Expand All @@ -187,7 +187,7 @@ func runE(ctx context.Context, flags *flagpole) error {

if flags.ManifestStorageURL != "" {
manifestCacheOpts := []cache.Option{}
sd, err := storage.NewStorage(flags.ManifestStorageURL)
sd, err := sss.NewSSS(sss.WithURL(flags.ManifestStorageURL))
if err != nil {
return fmt.Errorf("create storage driver failed: %w", err)
}
Expand Down
4 changes: 0 additions & 4 deletions cmd/crproxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@ import (
csync "github.com/daocloud/crproxy/cmd/crproxy/sync"
"github.com/daocloud/crproxy/internal/signals"
"github.com/spf13/cobra"

_ "github.com/daocloud/crproxy/storage/driver/obs"
_ "github.com/daocloud/crproxy/storage/driver/oss"
_ "github.com/daocloud/crproxy/storage/driver/s3"
)

func init() {
Expand Down
4 changes: 2 additions & 2 deletions cmd/crproxy/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import (
"strings"

"github.com/daocloud/crproxy/cache"
"github.com/daocloud/crproxy/storage"
csync "github.com/daocloud/crproxy/sync"
"github.com/daocloud/crproxy/transport"
"github.com/docker/distribution/manifest/manifestlist"
"github.com/spf13/cobra"
"github.com/wzshiming/sss"
)

type flagpole struct {
Expand Down Expand Up @@ -69,7 +69,7 @@ func runE(ctx context.Context, flags *flagpole) error {

var caches []*cache.Cache
for _, s := range flags.StorageURL {
sd, err := storage.NewStorage(s)
sd, err := sss.NewSSS(sss.WithURL(s))
if err != nil {
return fmt.Errorf("create storage driver failed: %w", err)
}
Expand Down
10 changes: 3 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
module github.com/daocloud/crproxy

go 1.23.4
go 1.24

require (
github.com/aws/aws-sdk-go v1.48.10
github.com/denverdino/aliyungo v0.0.0
github.com/distribution/reference v0.6.0
github.com/docker/distribution v2.8.2+incompatible
github.com/emicklei/go-restful-openapi/v2 v2.11.0
Expand All @@ -13,13 +11,13 @@ require (
github.com/go-sql-driver/mysql v1.8.1
github.com/google/go-containerregistry v0.20.2
github.com/gorilla/handlers v1.5.2
github.com/huaweicloud/huaweicloud-sdk-go-obs v3.24.6+incompatible
github.com/opencontainers/go-digest v1.0.0
github.com/spf13/cobra v1.8.1
github.com/wzshiming/cmux v0.4.2
github.com/wzshiming/hostmatcher v0.0.3
github.com/wzshiming/httpseek v0.2.0
github.com/wzshiming/imc v0.0.0-20250106051804-1cb884b5184a
github.com/wzshiming/sss v0.0.0-20250217062824-3687ab53ed28
golang.org/x/crypto v0.28.0
golang.org/x/time v0.10.0
)
Expand All @@ -31,9 +29,9 @@ replace (

require (
filippo.io/edwards25519 v1.1.0 // indirect
github.com/aws/aws-sdk-go v1.55.6 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/docker/cli v27.1.1+incompatible // indirect
github.com/docker/docker-credential-helpers v0.7.0 // indirect
github.com/docker/go-metrics v0.0.1 // indirect
Expand All @@ -50,9 +48,7 @@ require (
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/opencontainers/image-spec v1.1.0-rc3 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.19.1 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
Expand Down
Loading

0 comments on commit 84ffcae

Please sign in to comment.