Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm: ctx propagation; bmt fixes; pss generic notification framework #17150

Merged
merged 15 commits into from
Jul 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/swarm/fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.

// +build linux darwin freebsd

package main

import (
Expand Down
3 changes: 2 additions & 1 deletion cmd/swarm/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package main

import (
"context"
"fmt"
"os"

Expand All @@ -39,7 +40,7 @@ func hash(ctx *cli.Context) {

stat, _ := f.Stat()
fileStore := storage.NewFileStore(storage.NewMapChunkStore(), storage.NewFileStoreParams())
addr, _, err := fileStore.Store(f, stat.Size(), false)
addr, _, err := fileStore.Store(context.TODO(), f, stat.Size(), false)
if err != nil {
utils.Fatalf("%v\n", err)
} else {
Expand Down
4 changes: 2 additions & 2 deletions cmd/swarm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ var (
}
SwarmWantManifestFlag = cli.BoolTFlag{
Name: "manifest",
Usage: "Automatic manifest upload",
Usage: "Automatic manifest upload (default true)",
}
SwarmUploadDefaultPath = cli.StringFlag{
Name: "defaultpath",
Expand All @@ -155,7 +155,7 @@ var (
}
SwarmUploadMimeType = cli.StringFlag{
Name: "mime",
Usage: "force mime type",
Usage: "Manually specify MIME type",
}
SwarmEncryptedFlag = cli.BoolFlag{
Name: "encrypt",
Expand Down
8 changes: 7 additions & 1 deletion cmd/swarm/swarm-smoke/upload_and_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,14 @@ import (
)

func generateEndpoints(scheme string, cluster string, from int, to int) {
if cluster == "prod" {
cluster = ""
} else {
cluster = cluster + "."
}

for port := from; port <= to; port++ {
endpoints = append(endpoints, fmt.Sprintf("%s://%v.%s.swarm-gateways.net", scheme, port, cluster))
endpoints = append(endpoints, fmt.Sprintf("%s://%v.%sswarm-gateways.net", scheme, port, cluster))
}

if includeLocalhost {
Expand Down
6 changes: 6 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,14 @@ func CollectProcessMetrics(refresh time.Duration) {
memPauses := GetOrRegisterMeter("system/memory/pauses", DefaultRegistry)

var diskReads, diskReadBytes, diskWrites, diskWriteBytes Meter
var diskReadBytesCounter, diskWriteBytesCounter Counter
if err := ReadDiskStats(diskstats[0]); err == nil {
diskReads = GetOrRegisterMeter("system/disk/readcount", DefaultRegistry)
diskReadBytes = GetOrRegisterMeter("system/disk/readdata", DefaultRegistry)
diskReadBytesCounter = GetOrRegisterCounter("system/disk/readbytes", DefaultRegistry)
diskWrites = GetOrRegisterMeter("system/disk/writecount", DefaultRegistry)
diskWriteBytes = GetOrRegisterMeter("system/disk/writedata", DefaultRegistry)
diskWriteBytesCounter = GetOrRegisterCounter("system/disk/writebytes", DefaultRegistry)
} else {
log.Debug("Failed to read disk metrics", "err", err)
}
Expand All @@ -82,6 +85,9 @@ func CollectProcessMetrics(refresh time.Duration) {
diskReadBytes.Mark(diskstats[location1].ReadBytes - diskstats[location2].ReadBytes)
diskWrites.Mark(diskstats[location1].WriteCount - diskstats[location2].WriteCount)
diskWriteBytes.Mark(diskstats[location1].WriteBytes - diskstats[location2].WriteBytes)

diskReadBytesCounter.Inc(diskstats[location1].ReadBytes - diskstats[location2].ReadBytes)
diskWriteBytesCounter.Inc(diskstats[location1].WriteBytes - diskstats[location2].WriteBytes)
}
time.Sleep(refresh)
}
Expand Down
75 changes: 38 additions & 37 deletions swarm/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,28 +227,28 @@ func NewAPI(fileStore *storage.FileStore, dns Resolver, resourceHandler *mru.Han
}

// Upload to be used only in TEST
func (a *API) Upload(uploadDir, index string, toEncrypt bool) (hash string, err error) {
func (a *API) Upload(ctx context.Context, uploadDir, index string, toEncrypt bool) (hash string, err error) {
fs := NewFileSystem(a)
hash, err = fs.Upload(uploadDir, index, toEncrypt)
return hash, err
}

// Retrieve FileStore reader API
func (a *API) Retrieve(addr storage.Address) (reader storage.LazySectionReader, isEncrypted bool) {
return a.fileStore.Retrieve(addr)
func (a *API) Retrieve(ctx context.Context, addr storage.Address) (reader storage.LazySectionReader, isEncrypted bool) {
return a.fileStore.Retrieve(ctx, addr)
}

// Store wraps the Store API call of the embedded FileStore
func (a *API) Store(data io.Reader, size int64, toEncrypt bool) (addr storage.Address, wait func(), err error) {
func (a *API) Store(ctx context.Context, data io.Reader, size int64, toEncrypt bool) (addr storage.Address, wait func(ctx context.Context) error, err error) {
log.Debug("api.store", "size", size)
return a.fileStore.Store(data, size, toEncrypt)
return a.fileStore.Store(ctx, data, size, toEncrypt)
}

// ErrResolve is returned when an URI cannot be resolved from ENS.
type ErrResolve error

// Resolve resolves a URI to an Address using the MultiResolver.
func (a *API) Resolve(uri *URI) (storage.Address, error) {
func (a *API) Resolve(ctx context.Context, uri *URI) (storage.Address, error) {
apiResolveCount.Inc(1)
log.Trace("resolving", "uri", uri.Addr)

Expand Down Expand Up @@ -286,34 +286,37 @@ func (a *API) Resolve(uri *URI) (storage.Address, error) {
}

// Put provides singleton manifest creation on top of FileStore store
func (a *API) Put(content, contentType string, toEncrypt bool) (k storage.Address, wait func(), err error) {
func (a *API) Put(ctx context.Context, content string, contentType string, toEncrypt bool) (k storage.Address, wait func(context.Context) error, err error) {
apiPutCount.Inc(1)
r := strings.NewReader(content)
key, waitContent, err := a.fileStore.Store(r, int64(len(content)), toEncrypt)
key, waitContent, err := a.fileStore.Store(ctx, r, int64(len(content)), toEncrypt)
if err != nil {
apiPutFail.Inc(1)
return nil, nil, err
}
manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType)
r = strings.NewReader(manifest)
key, waitManifest, err := a.fileStore.Store(r, int64(len(manifest)), toEncrypt)
key, waitManifest, err := a.fileStore.Store(ctx, r, int64(len(manifest)), toEncrypt)
if err != nil {
apiPutFail.Inc(1)
return nil, nil, err
}
return key, func() {
waitContent()
waitManifest()
return key, func(ctx context.Context) error {
err := waitContent(ctx)
if err != nil {
return err
}
return waitManifest(ctx)
}, nil
}

// Get uses iterative manifest retrieval and prefix matching
// to resolve basePath to content using FileStore retrieve
// it returns a section reader, mimeType, status, the key of the actual content and an error
func (a *API) Get(manifestAddr storage.Address, path string) (reader storage.LazySectionReader, mimeType string, status int, contentAddr storage.Address, err error) {
func (a *API) Get(ctx context.Context, manifestAddr storage.Address, path string) (reader storage.LazySectionReader, mimeType string, status int, contentAddr storage.Address, err error) {
log.Debug("api.get", "key", manifestAddr, "path", path)
apiGetCount.Inc(1)
trie, err := loadManifest(a.fileStore, manifestAddr, nil)
trie, err := loadManifest(ctx, a.fileStore, manifestAddr, nil)
if err != nil {
apiGetNotFound.Inc(1)
status = http.StatusNotFound
Expand Down Expand Up @@ -375,7 +378,7 @@ func (a *API) Get(manifestAddr storage.Address, path string) (reader storage.Laz
log.Trace("resource is multihash", "key", manifestAddr)

// get the manifest the multihash digest points to
trie, err := loadManifest(a.fileStore, manifestAddr, nil)
trie, err := loadManifest(ctx, a.fileStore, manifestAddr, nil)
if err != nil {
apiGetNotFound.Inc(1)
status = http.StatusNotFound
Expand Down Expand Up @@ -410,7 +413,7 @@ func (a *API) Get(manifestAddr storage.Address, path string) (reader storage.Laz
}
mimeType = entry.ContentType
log.Debug("content lookup key", "key", contentAddr, "mimetype", mimeType)
reader, _ = a.fileStore.Retrieve(contentAddr)
reader, _ = a.fileStore.Retrieve(ctx, contentAddr)
} else {
// no entry found
status = http.StatusNotFound
Expand All @@ -422,10 +425,10 @@ func (a *API) Get(manifestAddr storage.Address, path string) (reader storage.Laz
}

// Modify loads manifest and checks the content hash before recalculating and storing the manifest.
func (a *API) Modify(addr storage.Address, path, contentHash, contentType string) (storage.Address, error) {
func (a *API) Modify(ctx context.Context, addr storage.Address, path, contentHash, contentType string) (storage.Address, error) {
apiModifyCount.Inc(1)
quitC := make(chan bool)
trie, err := loadManifest(a.fileStore, addr, quitC)
trie, err := loadManifest(ctx, a.fileStore, addr, quitC)
if err != nil {
apiModifyFail.Inc(1)
return nil, err
Expand All @@ -449,15 +452,15 @@ func (a *API) Modify(addr storage.Address, path, contentHash, contentType string
}

// AddFile creates a new manifest entry, adds it to swarm, then adds a file to swarm.
func (a *API) AddFile(mhash, path, fname string, content []byte, nameresolver bool) (storage.Address, string, error) {
func (a *API) AddFile(ctx context.Context, mhash, path, fname string, content []byte, nameresolver bool) (storage.Address, string, error) {
apiAddFileCount.Inc(1)

uri, err := Parse("bzz:/" + mhash)
if err != nil {
apiAddFileFail.Inc(1)
return nil, "", err
}
mkey, err := a.Resolve(uri)
mkey, err := a.Resolve(ctx, uri)
if err != nil {
apiAddFileFail.Inc(1)
return nil, "", err
Expand All @@ -476,13 +479,13 @@ func (a *API) AddFile(mhash, path, fname string, content []byte, nameresolver bo
ModTime: time.Now(),
}

mw, err := a.NewManifestWriter(mkey, nil)
mw, err := a.NewManifestWriter(ctx, mkey, nil)
if err != nil {
apiAddFileFail.Inc(1)
return nil, "", err
}

fkey, err := mw.AddEntry(bytes.NewReader(content), entry)
fkey, err := mw.AddEntry(ctx, bytes.NewReader(content), entry)
if err != nil {
apiAddFileFail.Inc(1)
return nil, "", err
Expand All @@ -496,19 +499,18 @@ func (a *API) AddFile(mhash, path, fname string, content []byte, nameresolver bo
}

return fkey, newMkey.String(), nil

}

// RemoveFile removes a file entry in a manifest.
func (a *API) RemoveFile(mhash, path, fname string, nameresolver bool) (string, error) {
func (a *API) RemoveFile(ctx context.Context, mhash string, path string, fname string, nameresolver bool) (string, error) {
apiRmFileCount.Inc(1)

uri, err := Parse("bzz:/" + mhash)
if err != nil {
apiRmFileFail.Inc(1)
return "", err
}
mkey, err := a.Resolve(uri)
mkey, err := a.Resolve(ctx, uri)
if err != nil {
apiRmFileFail.Inc(1)
return "", err
Expand All @@ -519,7 +521,7 @@ func (a *API) RemoveFile(mhash, path, fname string, nameresolver bool) (string,
path = path[1:]
}

mw, err := a.NewManifestWriter(mkey, nil)
mw, err := a.NewManifestWriter(ctx, mkey, nil)
if err != nil {
apiRmFileFail.Inc(1)
return "", err
Expand All @@ -542,7 +544,7 @@ func (a *API) RemoveFile(mhash, path, fname string, nameresolver bool) (string,
}

// AppendFile removes old manifest, appends file entry to new manifest and adds it to Swarm.
func (a *API) AppendFile(mhash, path, fname string, existingSize int64, content []byte, oldAddr storage.Address, offset int64, addSize int64, nameresolver bool) (storage.Address, string, error) {
func (a *API) AppendFile(ctx context.Context, mhash, path, fname string, existingSize int64, content []byte, oldAddr storage.Address, offset int64, addSize int64, nameresolver bool) (storage.Address, string, error) {
apiAppendFileCount.Inc(1)

buffSize := offset + addSize
Expand All @@ -552,7 +554,7 @@ func (a *API) AppendFile(mhash, path, fname string, existingSize int64, content

buf := make([]byte, buffSize)

oldReader, _ := a.Retrieve(oldAddr)
oldReader, _ := a.Retrieve(ctx, oldAddr)
io.ReadAtLeast(oldReader, buf, int(offset))

newReader := bytes.NewReader(content)
Expand All @@ -575,7 +577,7 @@ func (a *API) AppendFile(mhash, path, fname string, existingSize int64, content
apiAppendFileFail.Inc(1)
return nil, "", err
}
mkey, err := a.Resolve(uri)
mkey, err := a.Resolve(ctx, uri)
if err != nil {
apiAppendFileFail.Inc(1)
return nil, "", err
Expand All @@ -586,7 +588,7 @@ func (a *API) AppendFile(mhash, path, fname string, existingSize int64, content
path = path[1:]
}

mw, err := a.NewManifestWriter(mkey, nil)
mw, err := a.NewManifestWriter(ctx, mkey, nil)
if err != nil {
apiAppendFileFail.Inc(1)
return nil, "", err
Expand All @@ -606,7 +608,7 @@ func (a *API) AppendFile(mhash, path, fname string, existingSize int64, content
ModTime: time.Now(),
}

fkey, err := mw.AddEntry(io.Reader(combinedReader), entry)
fkey, err := mw.AddEntry(ctx, io.Reader(combinedReader), entry)
if err != nil {
apiAppendFileFail.Inc(1)
return nil, "", err
Expand All @@ -620,23 +622,22 @@ func (a *API) AppendFile(mhash, path, fname string, existingSize int64, content
}

return fkey, newMkey.String(), nil

}

// BuildDirectoryTree used by swarmfs_unix
func (a *API) BuildDirectoryTree(mhash string, nameresolver bool) (addr storage.Address, manifestEntryMap map[string]*manifestTrieEntry, err error) {
func (a *API) BuildDirectoryTree(ctx context.Context, mhash string, nameresolver bool) (addr storage.Address, manifestEntryMap map[string]*manifestTrieEntry, err error) {

uri, err := Parse("bzz:/" + mhash)
if err != nil {
return nil, nil, err
}
addr, err = a.Resolve(uri)
addr, err = a.Resolve(ctx, uri)
if err != nil {
return nil, nil, err
}

quitC := make(chan bool)
rootTrie, err := loadManifest(a.fileStore, addr, quitC)
rootTrie, err := loadManifest(ctx, a.fileStore, addr, quitC)
if err != nil {
return nil, nil, fmt.Errorf("can't load manifest %v: %v", addr.String(), err)
}
Expand Down Expand Up @@ -725,8 +726,8 @@ func (a *API) ResourceIsValidated() bool {
}

// ResolveResourceManifest retrieves the Mutable Resource manifest for the given address, and returns the address of the metadata chunk.
func (a *API) ResolveResourceManifest(addr storage.Address) (storage.Address, error) {
trie, err := loadManifest(a.fileStore, addr, nil)
func (a *API) ResolveResourceManifest(ctx context.Context, addr storage.Address) (storage.Address, error) {
trie, err := loadManifest(ctx, a.fileStore, addr, nil)
if err != nil {
return nil, fmt.Errorf("cannot load resource manifest: %v", err)
}
Expand Down
13 changes: 8 additions & 5 deletions swarm/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func expResponse(content string, mimeType string, status int) *Response {

func testGet(t *testing.T, api *API, bzzhash, path string) *testResponse {
addr := storage.Address(common.Hex2Bytes(bzzhash))
reader, mimeType, status, _, err := api.Get(addr, path)
reader, mimeType, status, _, err := api.Get(context.TODO(), addr, path)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand All @@ -109,12 +109,15 @@ func TestApiPut(t *testing.T) {
testAPI(t, func(api *API, toEncrypt bool) {
content := "hello"
exp := expResponse(content, "text/plain", 0)
// exp := expResponse([]byte(content), "text/plain", 0)
addr, wait, err := api.Put(content, exp.MimeType, toEncrypt)
ctx := context.TODO()
addr, wait, err := api.Put(ctx, content, exp.MimeType, toEncrypt)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
err = wait(ctx)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
wait()
resp := testGet(t, api, addr.Hex(), "")
checkResponse(t, resp, exp)
})
Expand Down Expand Up @@ -226,7 +229,7 @@ func TestAPIResolve(t *testing.T) {
if x.immutable {
uri.Scheme = "bzz-immutable"
}
res, err := api.Resolve(uri)
res, err := api.Resolve(context.TODO(), uri)
if err == nil {
if x.expectErr != nil {
t.Fatalf("expected error %q, got result %q", x.expectErr, res)
Expand Down
Loading