Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

swarm: push tags integration - request flow #1347

Merged
merged 18 commits into from
May 5, 2019
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 31 additions & 53 deletions swarm/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,20 @@ func init() {
}

func testAPI(t *testing.T, f func(*API, *chunk.Tags, bool)) {
datadir, err := ioutil.TempDir("", "bzz-test")
if err != nil {
t.Fatalf("unable to create temp dir: %v", err)
}
defer os.RemoveAll(datadir)
tags := chunk.NewTags()
fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32), tags)
if err != nil {
return
for _, v := range []bool{true, false} {
datadir, err := ioutil.TempDir("", "bzz-test")
if err != nil {
t.Fatalf("unable to create temp dir: %v", err)
}
defer os.RemoveAll(datadir)
tags := chunk.NewTags()
fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32), tags)
if err != nil {
return
}
api := NewAPI(fileStore, nil, nil, nil, tags)
f(api, tags, v)
}
api := NewAPI(fileStore, nil, nil, nil, tags)
f(api, tags, false)
f(api, tags, true)
}

type testResponse struct {
Expand Down Expand Up @@ -123,7 +124,6 @@ func testGet(t *testing.T, api *API, bzzhash, path string) *testResponse {
}
reader.Seek(0, 0)
return &testResponse{reader, &Response{mimeType, status, size, string(s)}}
// return &testResponse{reader, &Response{mimeType, status, reader.Size(), nil}}
}

func TestApiPut(t *testing.T) {
Expand All @@ -148,23 +148,32 @@ func TestApiPut(t *testing.T) {

// TestApiTagLarge tests that the the number of chunks counted is larger for a larger input
func TestApiTagLarge(t *testing.T) {
const contentLength = 4096 * 4095
testAPI(t, func(api *API, tags *chunk.Tags, toEncrypt bool) {
ctx := context.TODO()
//(data length / 4096) + 128
// nr of data chunks divided by 128 for unencrypted, 64 for encrypted, till u get to 1, add one root chunk
_, wait, err := putRandomContent(ctx, api, 4096*4095, "text/plain", true)
randomContentReader := io.LimitReader(crand.Reader, int64(contentLength))
tag, err := api.Tags.New("unnamed-tag", 0)
if err != nil {
t.Fatalf("unexpected error: %v", err)
t.Fatal(err)
}
err = wait(ctx)
ctx := sctx.SetTag(context.Background(), tag.Uid)
key, waitContent, err := api.Store(ctx, randomContentReader, int64(contentLength), toEncrypt)
if err != nil {
t.Fatalf("unexpected error: %v", err)
t.Fatal(err)
}
err = waitContent(ctx)
if err != nil {
t.Fatal(err)
}
tag.DoneSplit(key)

if toEncrypt {
tag := tags.All()[0]
expect := int64(4095 + 64 + 1)
testutil.CheckTag(t, tag, expect, expect, 0, expect)
} else {
tag := tags.All()[0]
testutil.CheckTag(t, tag, 4129, 4129, 0, 4129)
//testutil.CheckTag() //whatever
expect := int64(4095 + 32 + 1)
testutil.CheckTag(t, tag, expect, expect, 0, expect)
}
})
}
Expand Down Expand Up @@ -538,37 +547,6 @@ func TestDetectContentType(t *testing.T) {
}
}

// putRandomContent provides singleton manifest creation on top of API. it uploads an arbitrary byte stream
// of the desired contentLength and wraps it in a manifest
func putRandomContent(ctx context.Context, a *API, contentLength int, contentType string, toEncrypt bool) (k storage.Address, wait func(context.Context) error, err error) {
randomContentReader := io.LimitReader(crand.Reader, int64(contentLength))

tag, err := a.Tags.New("unnamed-tag", 0)

log.Trace("created new tag", "uid", tag.Uid)

cCtx := sctx.SetTag(ctx, tag.Uid)
key, waitContent, err := a.Store(cCtx, randomContentReader, int64(contentLength), toEncrypt)
if err != nil {
return nil, nil, err
}
//manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType)
//r := strings.NewReader(manifest)
//key, waitManifest, err := a.Store(cCtx, r, int64(len(manifest)), toEncrypt)
if err != nil {
return nil, nil, err
}
tag.DoneSplit(key)
return key, func(ctx context.Context) error {
return waitContent(ctx)
/* err := waitContent(ctx)
if err != nil {
return err
}
return waitManifest(ctx)*/
}, nil
}

// putString provides singleton manifest creation on top of api.API
func putString(ctx context.Context, a *API, content string, contentType string, toEncrypt bool) (k storage.Address, wait func(context.Context) error, err error) {
r := strings.NewReader(content)
Expand Down
9 changes: 4 additions & 5 deletions swarm/api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/api"
swarmhttp "github.com/ethereum/go-ethereum/swarm/api/http"
"github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/storage/feed"
"github.com/pborman/uuid"
Expand All @@ -49,8 +50,6 @@ var (
ErrUnauthorized = errors.New("unauthorized")
)

const SwarmTagHeaderName = "x-swarm-tag"

func NewClient(gateway string) *Client {
return &Client{
Gateway: gateway,
Expand All @@ -77,7 +76,7 @@ func (c *Client) UploadRaw(r io.Reader, size int64, toEncrypt bool) (string, err
return "", err
}
req.ContentLength = size
req.Header.Set(SwarmTagHeaderName, fmt.Sprintf("raw_upload_%d", time.Now().Unix()))
req.Header.Set(swarmhttp.SwarmTagHeaderName, fmt.Sprintf("raw_upload_%d", time.Now().Unix()))

res, err := http.DefaultClient.Do(req)
if err != nil {
Expand Down Expand Up @@ -539,7 +538,7 @@ func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, t
}
log.Trace("setting upload tag", "tag", tag)

req.Header.Set(SwarmTagHeaderName, tag)
req.Header.Set(swarmhttp.SwarmTagHeaderName, tag)

// use 'Expect: 100-continue' so we don't send the request body if
// the server refuses the request
Expand Down Expand Up @@ -606,7 +605,7 @@ func (c *Client) MultipartUpload(hash string, uploader Uploader) (string, error)

mw := multipart.NewWriter(reqW)
req.Header.Set("Content-Type", fmt.Sprintf("multipart/form-data; boundary=%q", mw.Boundary()))
req.Header.Set(SwarmTagHeaderName, fmt.Sprintf("multipart_upload_%d", time.Now().Unix()))
req.Header.Set(swarmhttp.SwarmTagHeaderName, fmt.Sprintf("multipart_upload_%d", time.Now().Unix()))

// define an UploadFn which adds files to the multipart form
uploadFn := func(file *File) error {
Expand Down
29 changes: 14 additions & 15 deletions swarm/api/http/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"net/http"
"runtime/debug"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -93,10 +92,9 @@ func InitUploadTag(h http.Handler, tags *chunk.Tags) http.Handler {
var (
tagName string
err error
estimatedTotal = 0
contentType = r.Header.Get("Content-Type")
contentLength = r.Header.Get("Content-Length")
headerTag = r.Header.Get(SwarmTagHeaderName)
estimatedTotal int64 = 0
contentType = r.Header.Get("Content-Type")
headerTag = r.Header.Get(SwarmTagHeaderName)
)
if headerTag != "" {
tagName = headerTag
Expand All @@ -105,18 +103,19 @@ func InitUploadTag(h http.Handler, tags *chunk.Tags) http.Handler {
tagName = fmt.Sprintf("unnamed_tag_%d", time.Now().Unix())
}

log.Trace("trying to estimate tag size", "contentType", contentType, "contentLength", contentLength, "cl", r.ContentLength)

if !strings.Contains(contentType, "multipart") && contentLength != "" {
estimatedTotal, err = strconv.Atoi(contentLength)
if err != nil {
log.Error("error parsing content-length string, falling back to 0", "contentLength", contentLength)
estimatedTotal = 0
} else {
estimatedTotal = estimatedTotal / 4096
if !strings.Contains(contentType, "multipart") && r.ContentLength > 0 {
log.Trace("calculating tag size", "contentType", contentType, "contentLength", r.ContentLength)
uri := GetURI(r.Context())
if uri != nil {
log.Debug("got uri from context")
if uri.Addr == "encrypt" {
estimatedTotal = CalculateNumberOfChunks(r.ContentLength, true)
} else {
estimatedTotal = CalculateNumberOfChunks(r.ContentLength, false)
}
}

}

log.Trace("creating tag", "tagName", tagName, "estimatedTotal", estimatedTotal)

t, err := tags.New(tagName, estimatedTotal)
Expand Down
25 changes: 24 additions & 1 deletion swarm/api/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"fmt"
"io"
"io/ioutil"
"math"
"mime"
"mime/multipart"
"net/http"
Expand Down Expand Up @@ -272,13 +273,14 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *http.Request) {
return
}

addr, _, err := s.api.Store(r.Context(), r.Body, r.ContentLength, toEncrypt)
addr, wait, err := s.api.Store(r.Context(), r.Body, r.ContentLength, toEncrypt)
if err != nil {
postRawFail.Inc(1)
respondError(w, r, err.Error(), http.StatusInternalServerError)
return
}

wait(r.Context())
tag.DoneSplit(addr)

log.Debug("stored content", "ruid", ruid, "key", addr)
Expand Down Expand Up @@ -863,6 +865,27 @@ func (s *Server) HandleGetFile(w http.ResponseWriter, r *http.Request) {
http.ServeContent(w, r, fileName, time.Now(), newBufferedReadSeeker(reader, getFileBufferSize))
}

func CalculateNumberOfChunks(contentLength int64, isEncrypted bool) int64 {
if contentLength < 4096 {
return 1
}
branchingFactor := 128
if isEncrypted {
branchingFactor = 64
}

dataChunks := math.Ceil(float64(contentLength) / float64(4096))
totalChunks := dataChunks
intermediate := float64(dataChunks) / float64(branchingFactor)

for intermediate > 1 {
totalChunks += math.Ceil(intermediate)
intermediate = intermediate / float64(branchingFactor)
}

return int64(totalChunks) + 1
}

// The size of buffer used for bufio.Reader on LazyChunkReader passed to
// http.ServeContent in HandleGetFile.
// Warning: This value influences the number of chunk requests and chunker join goroutines
Expand Down
Loading