Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

swarm: push tags integration - request flow #1347

Merged
merged 18 commits into from
May 5, 2019
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/swarm/explore.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"

"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/storage"
"gopkg.in/urfave/cli.v1"
)
Expand All @@ -47,7 +48,7 @@ func hashes(ctx *cli.Context) {
}
defer f.Close()

fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, storage.NewFileStoreParams())
fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, storage.NewFileStoreParams(), chunk.NewTags())
refs, err := fileStore.GetAllReferences(context.TODO(), f, false)
if err != nil {
utils.Fatalf("%v\n", err)
Expand Down
3 changes: 2 additions & 1 deletion cmd/swarm/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/contracts/ens"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/storage"
"gopkg.in/urfave/cli.v1"
)
Expand Down Expand Up @@ -77,7 +78,7 @@ func hash(ctx *cli.Context) {
defer f.Close()

stat, _ := f.Stat()
fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, storage.NewFileStoreParams())
fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, storage.NewFileStoreParams(), chunk.NewTags())
addr, _, err := fileStore.Store(context.TODO(), f, stat.Size(), false)
if err != nil {
utils.Fatalf("%v\n", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/swarm/swarm-smoke/upload_and_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func getAllRefs(testData []byte) (storage.AddressCollection, error) {
return nil, fmt.Errorf("unable to create temp dir: %v", err)
}
defer os.RemoveAll(datadir)
fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32))
fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32), chunk.NewTags())
if err != nil {
return nil, err
}
Expand Down
32 changes: 4 additions & 28 deletions swarm/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/ethereum/go-ethereum/contracts/ens"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/storage"
Expand All @@ -53,8 +54,6 @@ import (
var (
apiResolveCount = metrics.NewRegisteredCounter("api.resolve.count", nil)
apiResolveFail = metrics.NewRegisteredCounter("api.resolve.fail", nil)
apiPutCount = metrics.NewRegisteredCounter("api.put.count", nil)
apiPutFail = metrics.NewRegisteredCounter("api.put.fail", nil)
apiGetCount = metrics.NewRegisteredCounter("api.get.count", nil)
apiGetNotFound = metrics.NewRegisteredCounter("api.get.notfound", nil)
apiGetHTTP300 = metrics.NewRegisteredCounter("api.get.http.300", nil)
Expand Down Expand Up @@ -188,15 +187,17 @@ type API struct {
feed *feed.Handler
fileStore *storage.FileStore
dns Resolver
Tags *chunk.Tags
Decryptor func(context.Context, string) DecryptFunc
}

// NewAPI the api constructor initialises a new API instance.
func NewAPI(fileStore *storage.FileStore, dns Resolver, feedHandler *feed.Handler, pk *ecdsa.PrivateKey) (self *API) {
func NewAPI(fileStore *storage.FileStore, dns Resolver, feedHandler *feed.Handler, pk *ecdsa.PrivateKey, tags *chunk.Tags) (self *API) {
self = &API{
fileStore: fileStore,
dns: dns,
feed: feedHandler,
Tags: tags,
Decryptor: func(ctx context.Context, credentials string) DecryptFunc {
return self.doDecrypt(ctx, credentials, pk)
},
Expand Down Expand Up @@ -297,31 +298,6 @@ func (a *API) ResolveURI(ctx context.Context, uri *URI, credentials string) (sto
return addr, nil
}

// Put provides singleton manifest creation on top of FileStore store
func (a *API) Put(ctx context.Context, content string, contentType string, toEncrypt bool) (k storage.Address, wait func(context.Context) error, err error) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method was only used by tests. i think we should not export test helpers on public APIs. hence the functionality was removed from the API struct and duplicated wherever necessary

apiPutCount.Inc(1)
r := strings.NewReader(content)
key, waitContent, err := a.fileStore.Store(ctx, r, int64(len(content)), toEncrypt)
if err != nil {
apiPutFail.Inc(1)
return nil, nil, err
}
manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType)
r = strings.NewReader(manifest)
key, waitManifest, err := a.fileStore.Store(ctx, r, int64(len(manifest)), toEncrypt)
if err != nil {
apiPutFail.Inc(1)
return nil, nil, err
}
return key, func(ctx context.Context) error {
err := waitContent(ctx)
if err != nil {
return err
}
return waitManifest(ctx)
}, nil
}

// Get uses iterative manifest retrieval and prefix matching
// to resolve basePath to content using FileStore retrieve
// it returns a section reader, mimeType, status, the key of the actual content and an error
Expand Down
114 changes: 105 additions & 9 deletions swarm/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,24 @@ package api
import (
"bytes"
"context"
crand "crypto/rand"
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
"math/big"
"os"
"strings"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/sctx"
"github.com/ethereum/go-ethereum/swarm/storage"
"github.com/ethereum/go-ethereum/swarm/testutil"
)

func init() {
Expand All @@ -41,26 +45,34 @@ func init() {
log.Root().SetHandler(log.CallerFileHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(os.Stderr, log.TerminalFormat(true)))))
}

func testAPI(t *testing.T, f func(*API, bool)) {
func testAPI(t *testing.T, f func(*API, *chunk.Tags, bool)) {
datadir, err := ioutil.TempDir("", "bzz-test")
if err != nil {
t.Fatalf("unable to create temp dir: %v", err)
}
defer os.RemoveAll(datadir)
fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32))
tags := chunk.NewTags()
fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32), tags)
if err != nil {
return
}
api := NewAPI(fileStore, nil, nil, nil)
f(api, false)
f(api, true)
api := NewAPI(fileStore, nil, nil, nil, tags)
f(api, tags, false)
f(api, tags, true)
}

type testResponse struct {
reader storage.LazySectionReader
*Response
}

type Response struct {
MimeType string
Status int
Size int64
Content string
}

func checkResponse(t *testing.T, resp *testResponse, exp *Response) {

if resp.MimeType != exp.MimeType {
Expand Down Expand Up @@ -115,11 +127,11 @@ func testGet(t *testing.T, api *API, bzzhash, path string) *testResponse {
}

func TestApiPut(t *testing.T) {
testAPI(t, func(api *API, toEncrypt bool) {
testAPI(t, func(api *API, tags *chunk.Tags, toEncrypt bool) {
content := "hello"
exp := expResponse(content, "text/plain", 0)
ctx := context.TODO()
addr, wait, err := api.Put(ctx, content, exp.MimeType, toEncrypt)
addr, wait, err := putString(ctx, api, content, exp.MimeType, toEncrypt)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand All @@ -129,6 +141,31 @@ func TestApiPut(t *testing.T) {
}
resp := testGet(t, api, addr.Hex(), "")
checkResponse(t, resp, exp)
tag := tags.All()[0]
testutil.CheckTag(t, tag, 2, 2, 0, 2) //1 chunk data, 1 chunk manifest
})
}

// TestApiTagLarge tests that the the number of chunks counted is larger for a larger input
func TestApiTagLarge(t *testing.T) {
testAPI(t, func(api *API, tags *chunk.Tags, toEncrypt bool) {
ctx := context.TODO()
//(data length / 4096) + 128
// nr of data chunks divided by 128 for unencrypted, 64 for encrypted, till u get to 1, add one root chunk
_, wait, err := putRandomContent(ctx, api, 4096*4095, "text/plain", true)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
err = wait(ctx)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if toEncrypt {
} else {
tag := tags.All()[0]
testutil.CheckTag(t, tag, 4129, 4129, 0, 4129)
//testutil.CheckTag() //whatever
}
})
}

Expand Down Expand Up @@ -391,7 +428,7 @@ func TestDecryptOriginForbidden(t *testing.T) {
Access: &AccessEntry{Type: AccessTypePass},
}

api := NewAPI(nil, nil, nil, nil)
api := NewAPI(nil, nil, nil, nil, chunk.NewTags())

f := api.Decryptor(ctx, "")
err := f(me)
Expand Down Expand Up @@ -425,7 +462,7 @@ func TestDecryptOrigin(t *testing.T) {
Access: &AccessEntry{Type: AccessTypePass},
}

api := NewAPI(nil, nil, nil, nil)
api := NewAPI(nil, nil, nil, nil, chunk.NewTags())

f := api.Decryptor(ctx, "")
err := f(me)
Expand Down Expand Up @@ -500,3 +537,62 @@ func TestDetectContentType(t *testing.T) {
})
}
}

// putRandomContent provides singleton manifest creation on top of API. it uploads an arbitrary byte stream
// of the desired contentLength and wraps it in a manifest
func putRandomContent(ctx context.Context, a *API, contentLength int, contentType string, toEncrypt bool) (k storage.Address, wait func(context.Context) error, err error) {
randomContentReader := io.LimitReader(crand.Reader, int64(contentLength))

tag, err := a.Tags.New("unnamed-tag", 0)

log.Trace("created new tag", "uid", tag.Uid)

cCtx := sctx.SetTag(ctx, tag.Uid)
key, waitContent, err := a.Store(cCtx, randomContentReader, int64(contentLength), toEncrypt)
if err != nil {
return nil, nil, err
}
//manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType)
//r := strings.NewReader(manifest)
//key, waitManifest, err := a.Store(cCtx, r, int64(len(manifest)), toEncrypt)
if err != nil {
return nil, nil, err
}
tag.DoneSplit(key)
return key, func(ctx context.Context) error {
return waitContent(ctx)
/* err := waitContent(ctx)
if err != nil {
return err
}
return waitManifest(ctx)*/
}, nil
}

// putString provides singleton manifest creation on top of api.API
func putString(ctx context.Context, a *API, content string, contentType string, toEncrypt bool) (k storage.Address, wait func(context.Context) error, err error) {
r := strings.NewReader(content)
tag, err := a.Tags.New("unnamed-tag", 0)

log.Trace("created new tag", "uid", tag.Uid)

cCtx := sctx.SetTag(ctx, tag.Uid)
key, waitContent, err := a.Store(cCtx, r, int64(len(content)), toEncrypt)
if err != nil {
return nil, nil, err
}
manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType)
r = strings.NewReader(manifest)
key, waitManifest, err := a.Store(cCtx, r, int64(len(manifest)), toEncrypt)
if err != nil {
return nil, nil, err
}
tag.DoneSplit(key)
return key, func(ctx context.Context) error {
err := waitContent(ctx)
if err != nil {
return err
}
return waitManifest(ctx)
}, nil
}
Loading