Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

swarm: push tags integration - request flow #1347

Merged
merged 18 commits into from
May 5, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 21 additions & 14 deletions swarm/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,26 +141,32 @@ func TestApiPut(t *testing.T) {
}
resp := testGet(t, api, addr.Hex(), "")
checkResponse(t, resp, exp)
testutil.CheckTag(t, tags, chunk.SPLIT, 2, 2) //1 chunk data, 1 chunk manifest
tag := tags.All()[0]
testutil.CheckTag(t, tag, 2, 2, 0, 2) //1 chunk data, 1 chunk manifest
})
}

// TestApiTagLarge tests that the the number of chunks counted is larger for a larger input
func xTestApiTagLarge(t *testing.T) {
func TestApiTagLarge(t *testing.T) {
testAPI(t, func(api *API, tags *chunk.Tags, toEncrypt bool) {
ctx := context.TODO()
_, wait, err := putRandomContent(ctx, api, 4096*4095, "text/plain", toEncrypt)
//(data length / 4096) + 128
// nr of data chunks divided by 128 for unencrypted, 64 for encrypted, till u get to 1, add one root chunk
_, wait, err := putRandomContent(ctx, api, 4096*4095, "text/plain", true)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
err = wait(ctx)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
testutil.CheckTag(t, tags, chunk.SPLIT, 4129, 4129) //11 chunks random data, 1 chunk manifest
testutil.CheckTag(t, tags, chunk.SEEN, 0, 4129) //0 chunks seen, 12 total
if toEncrypt {
} else {
tag := tags.All()[0]
testutil.CheckTag(t, tag, 4129, 4129, 0, 4129)
//testutil.CheckTag() //whatever
}
})

}

// testResolver implements the Resolver interface and either returns the given
Expand Down Expand Up @@ -546,19 +552,20 @@ func putRandomContent(ctx context.Context, a *API, contentLength int, contentTyp
if err != nil {
return nil, nil, err
}
manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType)
r := strings.NewReader(manifest)
key, waitManifest, err := a.Store(cCtx, r, int64(len(manifest)), toEncrypt)
//manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType)
//r := strings.NewReader(manifest)
//key, waitManifest, err := a.Store(cCtx, r, int64(len(manifest)), toEncrypt)
if err != nil {
return nil, nil, err
}
tag.DoneSplit(key)
return key, func(ctx context.Context) error {
err := waitContent(ctx)
if err != nil {
return err
}
return waitManifest(ctx)
return waitContent(ctx)
/* err := waitContent(ctx)
if err != nil {
return err
}
return waitManifest(ctx)*/
}, nil
}

Expand Down
8 changes: 5 additions & 3 deletions swarm/api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ var (
ErrUnauthorized = errors.New("unauthorized")
)

const SwarmTagHeaderName = "x-swarm-tag"

func NewClient(gateway string) *Client {
return &Client{
Gateway: gateway,
Expand All @@ -75,7 +77,7 @@ func (c *Client) UploadRaw(r io.Reader, size int64, toEncrypt bool) (string, err
return "", err
}
req.ContentLength = size
req.Header.Set("x-swarm-tag", fmt.Sprintf("raw_upload_%d", time.Now().Unix()))
req.Header.Set(SwarmTagHeaderName, fmt.Sprintf("raw_upload_%d", time.Now().Unix()))

res, err := http.DefaultClient.Do(req)
if err != nil {
Expand Down Expand Up @@ -537,7 +539,7 @@ func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, t
}
log.Trace("setting upload tag", "tag", tag)

req.Header.Set("x-swarm-tag", tag)
req.Header.Set(SwarmTagHeaderName, tag)

// use 'Expect: 100-continue' so we don't send the request body if
// the server refuses the request
Expand Down Expand Up @@ -604,7 +606,7 @@ func (c *Client) MultipartUpload(hash string, uploader Uploader) (string, error)

mw := multipart.NewWriter(reqW)
req.Header.Set("Content-Type", fmt.Sprintf("multipart/form-data; boundary=%q", mw.Boundary()))
req.Header.Set("x-swarm-tag", fmt.Sprintf("multipart_upload_%d", time.Now().Unix()))
req.Header.Set(SwarmTagHeaderName, fmt.Sprintf("multipart_upload_%d", time.Now().Unix()))

// define an UploadFn which adds files to the multipart form
uploadFn := func(file *File) error {
Expand Down
48 changes: 6 additions & 42 deletions swarm/api/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/swarm/api"
swarmhttp "github.com/ethereum/go-ethereum/swarm/api/http"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/storage"
"github.com/ethereum/go-ethereum/swarm/storage/feed"
"github.com/ethereum/go-ethereum/swarm/storage/feed/lookup"
Expand Down Expand Up @@ -68,7 +67,8 @@ func testClientUploadDownloadRaw(toEncrypt bool, t *testing.T) {
}

// check the tag was created successfully
checkTag(t, srv.Tags, 1, 1, 0, 1)
tag := srv.Tags.All()[0]
testutil.CheckTag(t, tag, 1, 1, 0, 1)

// check we can download the same data
res, isEncrypted, err := client.DownloadRaw(hash)
Expand Down Expand Up @@ -212,7 +212,8 @@ func TestClientUploadDownloadDirectory(t *testing.T) {
}

// check the tag was created successfully
checkTag(t, srv.Tags, 9, 9, 0, 9)
tag := srv.Tags.All()[0]
testutil.CheckTag(t, tag, 9, 9, 0, 9)

// check we can download the individual files
checkDownloadFile := func(path string, expected []byte) {
Expand Down Expand Up @@ -355,7 +356,8 @@ func TestClientMultipartUpload(t *testing.T) {
}

// check the tag was created successfully
checkTag(t, srv.Tags, 9, 9, 7, 9)
tag := srv.Tags.All()[0]
testutil.CheckTag(t, tag, 9, 9, 7, 9)

// check we can download the individual files
checkDownloadFile := func(path string) {
Expand Down Expand Up @@ -604,41 +606,3 @@ func TestClientCreateUpdateFeed(t *testing.T) {
t.Fatalf("Expected: %v, got %v", databytes, gotData)
}
}

func checkTag(t *testing.T, tags *chunk.Tags, split, stored, seen, total int) {
t.Helper()
i := 0
// check that the tag was created and incremented accordingly
tags.Range(func(k, v interface{}) bool {
vv, ok := v.(*chunk.Tag)
if !ok {
t.Fatal("error unmarshalling tag pointer")
}

tSplit := vv.Get(chunk.SPLIT)
if tSplit != split {
t.Fatalf("should have had split chunks, got %d want %d", tSplit, split)
}

tSeen := vv.Get(chunk.SEEN)
if tSeen != seen {
t.Fatalf("should have had seen chunks, got %d want %d", tSeen, seen)
}

tStored := vv.Get(chunk.STORED)
if tStored != stored {
t.Fatalf("mismatch stored chunks, got %d want %d", tStored, stored)
}

tTotal := vv.Total()
if tTotal != total {
t.Fatalf("mismatch total chunks, got %d want %d", tTotal, total)
}
i++

return false
})
if i == 0 {
t.Fatal("no tags found")
}
}
4 changes: 2 additions & 2 deletions swarm/api/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,14 +350,14 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
respondError(w, r, fmt.Sprintf("cannot create manifest: %s", err), http.StatusInternalServerError)
return
}

tagUid := sctx.GetTag(r.Context())
tag, err := s.api.Tags.Get(tagUid)

if err != nil {
log.Error("got an error retrieving tag for DoneSplit", "tagUid", tagUid, "err", err)
}

log.Debug("done splitting, setting tag total", "SPLIT", tag.Get(chunk.SPLIT), "TOTAL", tag.Total())
log.Debug("done splitting, setting tag total", "SPLIT", tag.Get(chunk.StateSplit), "TOTAL", tag.Total())
tag.DoneSplit(newAddr)

log.Debug("stored content", "ruid", ruid, "key", newAddr)
Expand Down
11 changes: 6 additions & 5 deletions swarm/api/http/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"testing"
"time"

"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/storage/feed/lookup"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -767,7 +766,8 @@ func testBzzTar(encrypted bool, t *testing.T) {
}

// check that the tag was written correctly
testutil.CheckTag(t, srv.Tags, chunk.SPLIT, 4, 4)
tag := srv.Tags.All()[0]
testutil.CheckTag(t, tag, 4, 4, 0, 4)

swarmHash, err := ioutil.ReadAll(resp2.Body)
resp2.Body.Close()
Expand Down Expand Up @@ -856,14 +856,15 @@ func TestBzzCorrectTagEstimate(t *testing.T) {
c := make(chan struct{})

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
req, err := http.NewRequest("POST", srv.URL+"/bzz:/", pr)
if err != nil {
t.Fatal(err)
}

req = req.WithContext(ctx)
req.ContentLength = 1000000
req.Header.Add("x-swarm-tag", "1000000")
req.Header.Add(SwarmTagHeaderName, "1000000")

go func() {
for {
Expand All @@ -886,9 +887,9 @@ func TestBzzCorrectTagEstimate(t *testing.T) {
t.Log(err)
}
time.Sleep(100 * time.Millisecond)
testutil.CheckTag(t, srv.Tags, chunk.SEEN, 0, 244)
tag := srv.Tags.All()[0]
testutil.CheckTag(t, tag, 0, 0, 0, 244)
close(c)
cancel()
}

// TestBzzRootRedirect tests that getting the root path of a manifest without
Expand Down
34 changes: 17 additions & 17 deletions swarm/chunk/tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ var (
type State = uint32

const (
SPLIT State = iota // chunk has been processed by filehasher/swarm safe call
STORED // chunk stored locally
SEEN // chunk previously seen
SENT // chunk sent to neighbourhood
SYNCED // proof is received; chunk removed from sync db; chunk is available everywhere
StateSplit State = iota // chunk has been processed by filehasher/swarm safe call
StateStored // chunk stored locally
StateSeen // chunk previously seen
StateSent // chunk sent to neighbourhood
StateSynced // proof is received; chunk removed from sync db; chunk is available everywhere
)

// Tag represents info on the status of new chunks
Expand Down Expand Up @@ -71,15 +71,15 @@ func NewTag(uid uint32, s string, total uint32) *Tag {
func (t *Tag) Inc(state State) {
var v *uint32
switch state {
case SPLIT:
case StateSplit:
v = &t.split
case STORED:
case StateStored:
v = &t.stored
case SEEN:
case StateSeen:
v = &t.seen
case SENT:
case StateSent:
v = &t.sent
case SYNCED:
case StateSynced:
v = &t.synced
}
atomic.AddUint32(v, 1)
Expand All @@ -89,15 +89,15 @@ func (t *Tag) Inc(state State) {
func (t *Tag) Get(state State) int {
var v *uint32
switch state {
case SPLIT:
case StateSplit:
v = &t.split
case STORED:
case StateStored:
v = &t.stored
case SEEN:
case StateSeen:
v = &t.seen
case SENT:
case StateSent:
v = &t.sent
case SYNCED:
case StateSynced:
v = &t.synced
}
return int(atomic.LoadUint32(v))
Expand All @@ -124,9 +124,9 @@ func (t *Tag) Status(state State) (int, int, error) {
return count, total, errNA
}
switch state {
case SPLIT, STORED, SEEN:
case StateSplit, StateStored, StateSeen:
return count, total, nil
case SENT, SYNCED:
case StateSent, StateSynced:
stored := int(atomic.LoadUint32(&t.stored))
if stored < total {
return count, total - seen, errNA
Expand Down
36 changes: 18 additions & 18 deletions swarm/chunk/tag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

var (
allStates = []State{SPLIT, STORED, SEEN, SENT, SYNCED}
allStates = []State{StateSplit, StateStored, StateSeen, StateSent, StateSynced}
)

// TestTagSingleIncrements tests if Inc increments the tag state value
Expand All @@ -37,11 +37,11 @@ func TestTagSingleIncrements(t *testing.T) {
expcount int
exptotal int
}{
{state: SPLIT, inc: 10, expcount: 10, exptotal: 10},
{state: STORED, inc: 9, expcount: 9, exptotal: 9},
{state: SEEN, inc: 1, expcount: 1, exptotal: 10},
{state: SENT, inc: 9, expcount: 9, exptotal: 9},
{state: SYNCED, inc: 9, expcount: 9, exptotal: 9},
{state: StateSplit, inc: 10, expcount: 10, exptotal: 10},
{state: StateStored, inc: 9, expcount: 9, exptotal: 9},
{state: StateSeen, inc: 1, expcount: 1, exptotal: 10},
{state: StateSent, inc: 9, expcount: 9, exptotal: 9},
{state: StateSynced, inc: 9, expcount: 9, exptotal: 9},
}

for _, tc := range tc {
Expand All @@ -60,24 +60,24 @@ func TestTagSingleIncrements(t *testing.T) {
// TestTagStatus is a unit test to cover Tag.Status method functionality
func TestTagStatus(t *testing.T) {
tg := &Tag{total: 10}
tg.Inc(SEEN)
tg.Inc(SENT)
tg.Inc(SYNCED)
tg.Inc(StateSeen)
tg.Inc(StateSent)
tg.Inc(StateSynced)

for i := 0; i < 10; i++ {
tg.Inc(SPLIT)
tg.Inc(STORED)
tg.Inc(StateSplit)
tg.Inc(StateStored)
}
for _, v := range []struct {
state State
expVal int
expTotal int
}{
{state: STORED, expVal: 10, expTotal: 10},
{state: SPLIT, expVal: 10, expTotal: 10},
{state: SEEN, expVal: 1, expTotal: 10},
{state: SENT, expVal: 1, expTotal: 9},
{state: SYNCED, expVal: 1, expTotal: 9},
{state: StateStored, expVal: 10, expTotal: 10},
{state: StateSplit, expVal: 10, expTotal: 10},
{state: StateSeen, expVal: 1, expTotal: 10},
{state: StateSent, expVal: 1, expTotal: 9},
{state: StateSynced, expVal: 1, expTotal: 9},
} {
val, total, err := tg.Status(v.state)
if err != nil {
Expand All @@ -98,8 +98,8 @@ func TestTagETA(t *testing.T) {
maxDiff := 100000 // 100 microsecond
tg := &Tag{total: 10, startedAt: now}
time.Sleep(100 * time.Millisecond)
tg.Inc(SPLIT)
eta, err := tg.ETA(SPLIT)
tg.Inc(StateSplit)
eta, err := tg.ETA(StateSplit)
if err != nil {
t.Fatal(err)
}
Expand Down
Loading