Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

swarm: push tags integration - request flow #1347

Merged
merged 18 commits into from
May 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/swarm/explore.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"

"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/storage"
"gopkg.in/urfave/cli.v1"
)
Expand All @@ -47,7 +48,7 @@ func hashes(ctx *cli.Context) {
}
defer f.Close()

fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, storage.NewFileStoreParams())
fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, storage.NewFileStoreParams(), chunk.NewTags())
refs, err := fileStore.GetAllReferences(context.TODO(), f, false)
if err != nil {
utils.Fatalf("%v\n", err)
Expand Down
3 changes: 2 additions & 1 deletion cmd/swarm/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/contracts/ens"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/storage"
"gopkg.in/urfave/cli.v1"
)
Expand Down Expand Up @@ -77,7 +78,7 @@ func hash(ctx *cli.Context) {
defer f.Close()

stat, _ := f.Stat()
fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, storage.NewFileStoreParams())
fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, storage.NewFileStoreParams(), chunk.NewTags())
addr, _, err := fileStore.Store(context.TODO(), f, stat.Size(), false)
if err != nil {
utils.Fatalf("%v\n", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/swarm/swarm-smoke/upload_and_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func getAllRefs(testData []byte) (storage.AddressCollection, error) {
return nil, fmt.Errorf("unable to create temp dir: %v", err)
}
defer os.RemoveAll(datadir)
fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32))
fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32), chunk.NewTags())
if err != nil {
return nil, err
}
Expand Down
32 changes: 4 additions & 28 deletions swarm/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/ethereum/go-ethereum/contracts/ens"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/storage"
Expand All @@ -53,8 +54,6 @@ import (
var (
apiResolveCount = metrics.NewRegisteredCounter("api.resolve.count", nil)
apiResolveFail = metrics.NewRegisteredCounter("api.resolve.fail", nil)
apiPutCount = metrics.NewRegisteredCounter("api.put.count", nil)
apiPutFail = metrics.NewRegisteredCounter("api.put.fail", nil)
apiGetCount = metrics.NewRegisteredCounter("api.get.count", nil)
apiGetNotFound = metrics.NewRegisteredCounter("api.get.notfound", nil)
apiGetHTTP300 = metrics.NewRegisteredCounter("api.get.http.300", nil)
Expand Down Expand Up @@ -188,15 +187,17 @@ type API struct {
feed *feed.Handler
fileStore *storage.FileStore
dns Resolver
Tags *chunk.Tags
Decryptor func(context.Context, string) DecryptFunc
}

// NewAPI the api constructor initialises a new API instance.
func NewAPI(fileStore *storage.FileStore, dns Resolver, feedHandler *feed.Handler, pk *ecdsa.PrivateKey) (self *API) {
func NewAPI(fileStore *storage.FileStore, dns Resolver, feedHandler *feed.Handler, pk *ecdsa.PrivateKey, tags *chunk.Tags) (self *API) {
self = &API{
fileStore: fileStore,
dns: dns,
feed: feedHandler,
Tags: tags,
Decryptor: func(ctx context.Context, credentials string) DecryptFunc {
return self.doDecrypt(ctx, credentials, pk)
},
Expand Down Expand Up @@ -297,31 +298,6 @@ func (a *API) ResolveURI(ctx context.Context, uri *URI, credentials string) (sto
return addr, nil
}

// Put provides singleton manifest creation on top of FileStore store
func (a *API) Put(ctx context.Context, content string, contentType string, toEncrypt bool) (k storage.Address, wait func(context.Context) error, err error) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method was only used by tests. i think we should not export test helpers on public APIs. hence the functionality was removed from the API struct and duplicated wherever necessary

apiPutCount.Inc(1)
r := strings.NewReader(content)
key, waitContent, err := a.fileStore.Store(ctx, r, int64(len(content)), toEncrypt)
if err != nil {
apiPutFail.Inc(1)
return nil, nil, err
}
manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType)
r = strings.NewReader(manifest)
key, waitManifest, err := a.fileStore.Store(ctx, r, int64(len(manifest)), toEncrypt)
if err != nil {
apiPutFail.Inc(1)
return nil, nil, err
}
return key, func(ctx context.Context) error {
err := waitContent(ctx)
if err != nil {
return err
}
return waitManifest(ctx)
}, nil
}

// Get uses iterative manifest retrieval and prefix matching
// to resolve basePath to content using FileStore retrieve
// it returns a section reader, mimeType, status, the key of the actual content and an error
Expand Down
108 changes: 91 additions & 17 deletions swarm/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,24 @@ package api
import (
"bytes"
"context"
crand "crypto/rand"
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
"math/big"
"os"
"strings"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/sctx"
"github.com/ethereum/go-ethereum/swarm/storage"
"github.com/ethereum/go-ethereum/swarm/testutil"
)

func init() {
Expand All @@ -41,26 +45,35 @@ func init() {
log.Root().SetHandler(log.CallerFileHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(os.Stderr, log.TerminalFormat(true)))))
}

func testAPI(t *testing.T, f func(*API, bool)) {
datadir, err := ioutil.TempDir("", "bzz-test")
if err != nil {
t.Fatalf("unable to create temp dir: %v", err)
}
defer os.RemoveAll(datadir)
fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32))
if err != nil {
return
func testAPI(t *testing.T, f func(*API, *chunk.Tags, bool)) {
for _, v := range []bool{true, false} {
datadir, err := ioutil.TempDir("", "bzz-test")
if err != nil {
t.Fatalf("unable to create temp dir: %v", err)
}
defer os.RemoveAll(datadir)
tags := chunk.NewTags()
fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32), tags)
if err != nil {
return
}
api := NewAPI(fileStore, nil, nil, nil, tags)
f(api, tags, v)
}
api := NewAPI(fileStore, nil, nil, nil)
f(api, false)
f(api, true)
}

type testResponse struct {
reader storage.LazySectionReader
*Response
}

type Response struct {
MimeType string
Status int
Size int64
Content string
}

func checkResponse(t *testing.T, resp *testResponse, exp *Response) {

if resp.MimeType != exp.MimeType {
Expand Down Expand Up @@ -111,15 +124,14 @@ func testGet(t *testing.T, api *API, bzzhash, path string) *testResponse {
}
reader.Seek(0, 0)
return &testResponse{reader, &Response{mimeType, status, size, string(s)}}
// return &testResponse{reader, &Response{mimeType, status, reader.Size(), nil}}
}

func TestApiPut(t *testing.T) {
testAPI(t, func(api *API, toEncrypt bool) {
testAPI(t, func(api *API, tags *chunk.Tags, toEncrypt bool) {
content := "hello"
exp := expResponse(content, "text/plain", 0)
ctx := context.TODO()
addr, wait, err := api.Put(ctx, content, exp.MimeType, toEncrypt)
addr, wait, err := putString(ctx, api, content, exp.MimeType, toEncrypt)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand All @@ -129,6 +141,40 @@ func TestApiPut(t *testing.T) {
}
resp := testGet(t, api, addr.Hex(), "")
checkResponse(t, resp, exp)
tag := tags.All()[0]
testutil.CheckTag(t, tag, 2, 2, 0, 2) //1 chunk data, 1 chunk manifest
})
}

// TestApiTagLarge tests that the the number of chunks counted is larger for a larger input
func TestApiTagLarge(t *testing.T) {
const contentLength = 4096 * 4095
testAPI(t, func(api *API, tags *chunk.Tags, toEncrypt bool) {
randomContentReader := io.LimitReader(crand.Reader, int64(contentLength))
tag, err := api.Tags.New("unnamed-tag", 0)
if err != nil {
t.Fatal(err)
}
ctx := sctx.SetTag(context.Background(), tag.Uid)
key, waitContent, err := api.Store(ctx, randomContentReader, int64(contentLength), toEncrypt)
if err != nil {
t.Fatal(err)
}
err = waitContent(ctx)
if err != nil {
t.Fatal(err)
}
tag.DoneSplit(key)

if toEncrypt {
tag := tags.All()[0]
expect := int64(4095 + 64 + 1)
testutil.CheckTag(t, tag, expect, expect, 0, expect)
} else {
tag := tags.All()[0]
expect := int64(4095 + 32 + 1)
testutil.CheckTag(t, tag, expect, expect, 0, expect)
}
})
}

Expand Down Expand Up @@ -391,7 +437,7 @@ func TestDecryptOriginForbidden(t *testing.T) {
Access: &AccessEntry{Type: AccessTypePass},
}

api := NewAPI(nil, nil, nil, nil)
api := NewAPI(nil, nil, nil, nil, chunk.NewTags())

f := api.Decryptor(ctx, "")
err := f(me)
Expand Down Expand Up @@ -425,7 +471,7 @@ func TestDecryptOrigin(t *testing.T) {
Access: &AccessEntry{Type: AccessTypePass},
}

api := NewAPI(nil, nil, nil, nil)
api := NewAPI(nil, nil, nil, nil, chunk.NewTags())

f := api.Decryptor(ctx, "")
err := f(me)
Expand Down Expand Up @@ -500,3 +546,31 @@ func TestDetectContentType(t *testing.T) {
})
}
}

// putString provides singleton manifest creation on top of api.API
func putString(ctx context.Context, a *API, content string, contentType string, toEncrypt bool) (k storage.Address, wait func(context.Context) error, err error) {
r := strings.NewReader(content)
tag, err := a.Tags.New("unnamed-tag", 0)

log.Trace("created new tag", "uid", tag.Uid)

cCtx := sctx.SetTag(ctx, tag.Uid)
key, waitContent, err := a.Store(cCtx, r, int64(len(content)), toEncrypt)
if err != nil {
return nil, nil, err
}
manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType)
r = strings.NewReader(manifest)
key, waitManifest, err := a.Store(cCtx, r, int64(len(manifest)), toEncrypt)
if err != nil {
return nil, nil, err
}
tag.DoneSplit(key)
return key, func(ctx context.Context) error {
err := waitContent(ctx)
if err != nil {
return err
}
return waitManifest(ctx)
}, nil
}
32 changes: 32 additions & 0 deletions swarm/api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/api"
swarmhttp "github.com/ethereum/go-ethereum/swarm/api/http"
"github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/storage/feed"
"github.com/pborman/uuid"
Expand Down Expand Up @@ -75,6 +76,8 @@ func (c *Client) UploadRaw(r io.Reader, size int64, toEncrypt bool) (string, err
return "", err
}
req.ContentLength = size
req.Header.Set(swarmhttp.SwarmTagHeaderName, fmt.Sprintf("raw_upload_%d", time.Now().Unix()))

res, err := http.DefaultClient.Do(req)
if err != nil {
return "", err
Expand Down Expand Up @@ -111,6 +114,7 @@ func (c *Client) DownloadRaw(hash string) (io.ReadCloser, bool, error) {
type File struct {
io.ReadCloser
api.ManifestEntry
Tag string
}

// Open opens a local file which can then be passed to client.Upload to upload
Expand Down Expand Up @@ -139,6 +143,7 @@ func Open(path string) (*File, error) {
Size: stat.Size(),
ModTime: stat.ModTime(),
},
Tag: filepath.Base(path),
}, nil
}

Expand Down Expand Up @@ -422,6 +427,7 @@ func (c *Client) List(hash, prefix, credentials string) (*api.ManifestList, erro
// Uploader uploads files to swarm using a provided UploadFn
type Uploader interface {
Upload(UploadFn) error
Tag() string
}

type UploaderFunc func(UploadFn) error
Expand All @@ -430,12 +436,23 @@ func (u UploaderFunc) Upload(upload UploadFn) error {
return u(upload)
}

func (u UploaderFunc) Tag() string {
return fmt.Sprintf("multipart_upload_%d", time.Now().Unix())
}

// DirectoryUploader implements Uploader
var _ Uploader = &DirectoryUploader{}

// DirectoryUploader uploads all files in a directory, optionally uploading
// a file to the default path
type DirectoryUploader struct {
Dir string
}

func (d *DirectoryUploader) Tag() string {
return filepath.Base(d.Dir)
}

// Upload performs the upload of the directory and default path
func (d *DirectoryUploader) Upload(upload UploadFn) error {
return filepath.Walk(d.Dir, func(path string, f os.FileInfo, err error) error {
Expand All @@ -458,11 +475,17 @@ func (d *DirectoryUploader) Upload(upload UploadFn) error {
})
}

var _ Uploader = &FileUploader{}

// FileUploader uploads a single file
type FileUploader struct {
File *File
}

func (f *FileUploader) Tag() string {
return f.File.Tag
}

// Upload performs the upload of the file
func (f *FileUploader) Upload(upload UploadFn) error {
return upload(f.File)
Expand Down Expand Up @@ -509,6 +532,14 @@ func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, t
req.URL.RawQuery = q.Encode()
}

tag := uploader.Tag()
if tag == "" {
tag = "unnamed_tag_" + fmt.Sprintf("%d", time.Now().Unix())
}
log.Trace("setting upload tag", "tag", tag)

req.Header.Set(swarmhttp.SwarmTagHeaderName, tag)

// use 'Expect: 100-continue' so we don't send the request body if
// the server refuses the request
req.Header.Set("Expect", "100-continue")
Expand Down Expand Up @@ -574,6 +605,7 @@ func (c *Client) MultipartUpload(hash string, uploader Uploader) (string, error)

mw := multipart.NewWriter(reqW)
req.Header.Set("Content-Type", fmt.Sprintf("multipart/form-data; boundary=%q", mw.Boundary()))
req.Header.Set(swarmhttp.SwarmTagHeaderName, fmt.Sprintf("multipart_upload_%d", time.Now().Unix()))

// define an UploadFn which adds files to the multipart form
uploadFn := func(file *File) error {
Expand Down
Loading