diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go
index 631d27f095..50991a7b99 100644
--- a/swarm/pss/pss.go
+++ b/swarm/pss/pss.go
@@ -969,6 +969,7 @@ func (p *Pss) forward(msg *PssMsg) error {
return false // stop iterating
}
if sendFunc(p, sp, msg) {
+ log.Trace("forwarding", "self", label(p.BaseAddr()), "to", label(sp.Address()), "msg", msg.Payload.Hash().Hex()[:4])
sent++
if onlySendOnce {
return false
diff --git a/swarm/storage/common_test.go b/swarm/storage/common_test.go
index bcc29d8cc7..aaf551b813 100644
--- a/swarm/storage/common_test.go
+++ b/swarm/storage/common_test.go
@@ -237,38 +237,6 @@ func benchmarkStoreGet(store ChunkStore, n int, chunksize int64, b *testing.B) {
}
}
-// MapChunkStore is a very simple ChunkStore implementation to store chunks in a map in memory.
-type MapChunkStore struct {
- chunks map[string]Chunk
- mu sync.RWMutex
-}
-
-func NewMapChunkStore() *MapChunkStore {
- return &MapChunkStore{
- chunks: make(map[string]Chunk),
- }
-}
-
-func (m *MapChunkStore) Put(_ context.Context, ch Chunk) error {
- m.mu.Lock()
- defer m.mu.Unlock()
- m.chunks[ch.Address().Hex()] = ch
- return nil
-}
-
-func (m *MapChunkStore) Get(_ context.Context, ref Address) (Chunk, error) {
- m.mu.RLock()
- defer m.mu.RUnlock()
- chunk := m.chunks[ref.Hex()]
- if chunk == nil {
- return nil, ErrChunkNotFound
- }
- return chunk, nil
-}
-
-func (m *MapChunkStore) Close() {
-}
-
func chunkAddresses(chunks []Chunk) []Address {
addrs := make([]Address, len(chunks))
for i, ch := range chunks {
diff --git a/swarm/storage/types.go b/swarm/storage/types.go
index d792352252..740d405054 100644
--- a/swarm/storage/types.go
+++ b/swarm/storage/types.go
@@ -24,6 +24,7 @@ import (
"encoding/binary"
"fmt"
"io"
+ "sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/swarm/bmt"
@@ -322,3 +323,35 @@ func (f *FakeChunkStore) Get(_ context.Context, ref Address) (Chunk, error) {
// Close doesn't store anything it is just here to implement ChunkStore
func (f *FakeChunkStore) Close() {
}
+
+// MapChunkStore is a very simple ChunkStore implementation to store chunks in a map in memory.
+type MapChunkStore struct {
+ chunks map[string]Chunk
+ mu sync.RWMutex
+}
+
+func NewMapChunkStore() *MapChunkStore {
+ return &MapChunkStore{
+ chunks: make(map[string]Chunk),
+ }
+}
+
+func (m *MapChunkStore) Put(_ context.Context, ch Chunk) error {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.chunks[ch.Address().Hex()] = ch
+ return nil
+}
+
+func (m *MapChunkStore) Get(_ context.Context, ref Address) (Chunk, error) {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+ chunk := m.chunks[ref.Hex()]
+ if chunk == nil {
+ return nil, ErrChunkNotFound
+ }
+ return chunk, nil
+}
+
+func (m *MapChunkStore) Close() {
+}
diff --git a/swarm/syncer/db.go b/swarm/syncer/db.go
new file mode 100644
index 0000000000..d5c07425ef
--- /dev/null
+++ b/swarm/syncer/db.go
@@ -0,0 +1,431 @@
+package syncer
+
+import (
+ "context"
+ "encoding/binary"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/rlp"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+ "github.com/syndtr/goleveldb/leveldb" // "github.com/syndtr/goleveldb/leveldb/iterator"
+)
+
+// DB
+
+const (
+ batchSize = 256 // chunk hashes to keep in memory,
+ bufferSize = 16 // items to send while batch is retrieved
+)
+
+// State is the enum type for chunk states
+type State = uint32
+
+const (
+ SPLIT State = iota // chunk has been processed by filehasher/swarm safe call
+ STORED // chunk stored locally
+ SENT // chunk sent to neighbourhood
+ SYNCED // proof is received; chunk removed from sync db; chunk is available everywhere
+)
+
+var (
+ retryInterval = 30 * time.Second // seconds to wait before retry sync
+)
+
+var (
+ indexKey = []byte{1} // fixed key to store db storage index
+ sizeKey = []byte{2} // fixed key to store db size
+)
+
+// item holds info about a chunk, Address and Tag are exported fields so that they are
+// rlp serialised for disk storage in the DB
+// the rest of the fields are used for in memory cache
+type item struct {
+ Addr storage.Address // chunk address
+ Tag string // tag to track batches of chunks (allows for sync ETA)
+ PO uint // distance
+ key []byte // key to look up the item in the DB
+ chunk storage.Chunk // chunk is retrieved when item is popped from batch and pushed to buffer
+ sentAt time.Time // time chunk is sent to a peer for syncing
+ state uint32 // state
+}
+
+// newkey increments the db storage index and creates a byte slice key from it
+// by prefixing the binary serialisation with 0 byte
+func (db *DB) newKey() []byte {
+ index := atomic.AddInt64(&db.index, 1)
+ key := make([]byte, 9)
+ binary.BigEndian.PutUint64(key[1:], uint64(index))
+ return key
+}
+
+// bytes serialises item to []byte using rlp
+func (pi *item) bytes() []byte {
+ buf, err := rlp.EncodeToBytes(pi)
+ if err != nil {
+ panic(err.Error())
+ }
+ return buf
+}
+
+// DB implements a persisted FIFO queue for
+// - scheduling one repeatable task on each element in order of insertion
+// - it iterates over items put in the db in order of storage and
+// - calls a task function on them
+// - it listens on a channel for asynchronous task completion signals
+// - deletes completed items from storage
+// - retries the task on items after a delay not shorter than retryInterval
+// - it persists across sessions
+// - call tags to update state counts for a tag
+type DB struct {
+ db *storage.LDBDatabase // the underlying database
+ tags *tags // tags info on processing rates
+ chunkStore storage.ChunkStore // chunkstore to get the chunks from and put into
+ batchC chan *storeBatch // channel to pass current batch from listen loop to batch write loop
+ requestC chan struct{} // channel to indicate request for new items for iteration
+ itemsC chan []*item // channel to pass batch of iteration between feed loop and batch write loop
+ waiting sync.Map // stores items in memory while waiting for proof response
+ putC chan *item // channel to pass items to the listen loop from the Put API call
+ receiptsC chan storage.Address // channel to receive completed items
+ chunkC chan *item // buffer fed from db iteration and consumed by the main loop
+ quit chan struct{} // channel to signal quitting on all loops
+ dbquit chan struct{} // channel to signal batch was written and db can be closed
+ index int64 // ever incrementing storage index
+ size int64 // number of items
+ depthFunc func() uint // call to changed depth
+ depthC chan uint // kademlia neighbourhood depth
+}
+
+type storeBatch struct {
+ *leveldb.Batch
+ toDelete []string
+ new int
+}
+
+// NewDB constructs a DB
+func NewDB(dbpath string, store storage.ChunkStore, f func(storage.Chunk) error, receiptsC chan storage.Address, depthFunc func() uint) (*DB, error) {
+
+ ldb, err := storage.NewLDBDatabase(dbpath)
+ if err != nil {
+ return nil, err
+ }
+ db := &DB{
+ db: ldb,
+ chunkStore: store,
+ tags: newTags(),
+ batchC: make(chan *storeBatch, 1),
+ requestC: make(chan struct{}, 1),
+ itemsC: make(chan []*item),
+ putC: make(chan *item),
+ depthC: make(chan uint),
+ depthFunc: depthFunc,
+ receiptsC: receiptsC,
+ chunkC: make(chan *item, bufferSize),
+ quit: make(chan struct{}),
+ dbquit: make(chan struct{}),
+ }
+ db.index = db.getInt(indexKey)
+ db.size = db.getInt(sizeKey)
+ go db.listen()
+ go db.feedBuffer()
+ go db.writeBatches()
+ go db.iter(f)
+ return db, nil
+}
+
+// Put queues the item for batch insertion
+func (db *DB) Put(i *item) {
+ db.putC <- i
+ db.tags.Inc(i.Tag, STORED)
+}
+
+// Close terminates loops by closing the quit channel
+func (db *DB) Close() {
+ close(db.quit)
+ <-db.dbquit
+ db.db.Close()
+}
+
+// Size returns the number of items written out in the DB
+func (db *DB) Size() int64 {
+ return db.getInt(sizeKey)
+}
+
+// listen listens until quit to put and delete events and writes them in a batch
+func (db *DB) listen() {
+ var depth uint = 256
+ batch := &storeBatch{Batch: new(leveldb.Batch)}
+ for {
+ select {
+ case <-db.quit:
+ // make sure batch is saved to disk so as not to lose chunks
+ db.batchC <- batch
+ close(db.batchC)
+ return
+
+ case depth = <-db.depthC:
+
+ case item := <-db.putC:
+ // consume putC for insertion
+ // we can assume no duplicates are sent
+
+ key := db.newKey()
+ batch.Put(key, item.bytes())
+ batch.Put(indexKey, key[1:])
+ db.size++
+ batch.new++
+ if item.PO >= depth {
+ go func() {
+ db.waiting.Store(item.Addr.Hex(), item)
+ db.receiptsC <- item.Addr
+ }()
+ // continue
+ }
+
+ case addr := <-db.receiptsC:
+ log.Warn("received receipt", "addr", label(addr[:]))
+ // consume receiptsC for removal
+ // potential duplicates
+ v, ok := db.waiting.Load(addr.Hex())
+ if !ok {
+ // already deleted
+ continue
+ }
+ //
+ it := v.(*item)
+ // in case we receive twice within one batch
+ if it.state == SYNCED {
+ continue
+ }
+ log.Warn("Synced", "addr", label(addr[:]))
+ it.state = SYNCED
+ db.tags.Inc(it.Tag, SYNCED)
+ batch.Delete(it.key)
+ db.size--
+ batch.toDelete = append(batch.toDelete, addr.Hex())
+ }
+ batch.Put(sizeKey, int64ToBytes(db.size))
+
+ // if batchwriter loop is idle, hand over the batch and creates a new one
+ // if batchwriter loop is busy, keep adding to the same batch
+ select {
+ case db.batchC <- batch:
+ batch = &storeBatch{Batch: new(leveldb.Batch)}
+ default:
+ }
+ }
+}
+
+// writeBatches is a forever loop that updates the database in batches
+// containing insertions and deletions
+// whenever needed, after the update, an iteration is performed to gather items
+func (db *DB) writeBatches() {
+ var from uint64 // start cursor for db iteration
+ // retryInterval time after retrieval, the batch start offset is reset to 0
+ // if reset for all iterations then items would get retrieved spuriously
+ // multiple times before retryInterval
+ timer := time.NewTimer(retryInterval)
+ defer timer.Stop()
+ var timerC <-chan time.Time
+ var requestC chan struct{}
+ items := make([]*item, batchSize)
+
+ for {
+ select {
+ case batch := <-db.batchC:
+ if batch == nil {
+ close(db.dbquit)
+ return
+ }
+ // consume batches passed by listener if there were new updates
+ // write out the batch of updates collected
+ err := db.db.Write(batch.Batch)
+ if err != nil {
+ panic(err.Error())
+ }
+ // delete items from the waiting list
+ for addr := range batch.toDelete {
+ db.waiting.Delete(addr)
+ }
+ // if there are new items, allow iter batch requests
+ if batch.new > 0 {
+ requestC = db.requestC
+ }
+ continue
+ case <-requestC:
+ // accept requests to gather items
+
+ case <-timerC:
+ // start index from is reset to 0 if retryInterval time passed
+ from = 0
+ timerC = nil
+ requestC = db.requestC
+ continue
+ }
+
+ origfrom := from
+ size := 0
+ // retrieve maximum batchSize items from db in order of storage
+ // starting from index from
+ from = db.iterate(from, func(val *item) bool {
+ // every deserialised item is put in the batch
+ items[size] = val
+ // entry is created in the waiting map
+ db.waiting.Store(val.Addr.Hex(), val)
+ // increment size and return when the batch is filled
+ size++
+ return size < batchSize
+ })
+ // signal done with effective number of items retrieved
+ if size == 0 {
+ db.requestC <- struct{}{} // this cannot block
+ requestC = nil
+ if origfrom == 0 {
+ timerC = nil
+ } else {
+ timerC = timer.C
+ }
+ continue
+ }
+ if origfrom == 0 {
+ timer.Reset(retryInterval)
+ timerC = timer.C
+ }
+ db.itemsC <- items[:size]
+ from++
+ }
+}
+
+// forever loop feeds into a buffered channel through batches of DB retrievals
+// - batchSize determines the maximum amount of hashes read from the database in one go
+// - bufferSize determines the maximum number of chunks to consume while a batch is being retrieved
+// - retryInterval is the period we wait for the storage proof of a chunk before we retry syncing it
+func (db *DB) feedBuffer() {
+ var items []*item
+ // feed buffer from DB
+ for {
+ // if read batch is fully written to the buffer, i.e., loop var i reaches size
+ // reread into iter batch
+ db.requestC <- struct{}{} // does not block
+ items = <-db.itemsC // blocks until there are items to be read
+ log.Trace("reading batch to buffer", "size", len(items))
+
+ for _, next := range items {
+ if err := db.getChunk(next); err != nil {
+ log.Warn("chunk not found ... skipping and removing")
+ db.receiptsC <- next.Addr
+ continue
+ }
+
+ // feed the item to the chunk buffer of the db
+ select {
+ case db.chunkC <- next:
+ case <-db.quit:
+ // closing the buffer so that iter loop is terminated
+ close(db.chunkC)
+ return
+ }
+ }
+ }
+}
+
+// getChunk fills in the chunk field of the item using chunkStore to retrieve by address
+func (db *DB) getChunk(next *item) error {
+ if next.chunk != nil {
+ return nil
+ }
+ // retrieve the corresponding chunk if chunkStore is given
+ if db.chunkStore != nil {
+ chunk, err := db.chunkStore.Get(context.TODO(), next.Addr)
+ if err != nil {
+ return err
+ }
+ next.chunk = chunk
+ return nil
+ }
+ // otherwise create a fake chunk with only an address
+ next.chunk = storage.NewChunk(next.Addr, nil)
+ return nil
+}
+
+// iter consumes chunks from the buffer and calls f on the chunk
+// if last called more than retryInterval time ago
+func (db *DB) iter(f func(storage.Chunk) error) {
+ for c := range db.chunkC {
+ addr := c.Addr
+ val, ok := db.waiting.Load(addr.Hex())
+ // skip if deleted
+ if !ok {
+ continue
+ }
+ c = val.(*item)
+ state := State(atomic.LoadUint32(&c.state))
+ // if deleted since retrieved or already asked not too long ago
+ if state == SYNCED || state == SENT && c.sentAt.Add(retryInterval).After(time.Now()) {
+ continue
+ }
+ if c.chunk == nil {
+ err := db.getChunk(c)
+ if err != nil {
+ continue
+ }
+ }
+ if state != SENT {
+ c.state = SENT
+ db.tags.Inc(c.Tag, SENT)
+ }
+ f(c.chunk)
+ c.sentAt = time.Now()
+ // not to waste memory
+ c.chunk = nil
+ }
+}
+
+// iterate iterates through the keys in order of f
+func (db *DB) iterate(since uint64, f func(val *item) bool) uint64 {
+ it := db.db.NewIterator()
+ defer it.Release()
+ sincekey := make([]byte, 9)
+ binary.BigEndian.PutUint64(sincekey[1:], since)
+ for ok := it.Seek(sincekey); ok; ok = it.Next() {
+ key := it.Key()
+ if key[0] != byte(0) {
+ break
+ }
+ // deserialise the stored value as an item
+ var val item
+ err := rlp.DecodeBytes(it.Value(), &val)
+ if err != nil {
+ panic(err.Error())
+ }
+ // remember the key
+ val.key = make([]byte, 9)
+ copy(val.key, key)
+ val.state = STORED
+ since = binary.BigEndian.Uint64(key[1:])
+ // call the function on the value, continue if it returns true
+ if !f(&val) {
+ break
+ }
+ }
+ return since
+}
+
+// getInt retrieves a counter from the db, and deserialises it as int64
+// used for storage index and entry count
+func (db *DB) getInt(key []byte) int64 {
+ b, err := db.db.Get(key)
+ if err != nil {
+ return 0
+ }
+ return int64(binary.BigEndian.Uint64(b))
+}
+
+// int64ToBytes serialises an int64 to bytes using bigendian
+func int64ToBytes(n int64) []byte {
+ key := make([]byte, 8)
+ binary.BigEndian.PutUint64(key, uint64(n))
+ return key
+}
diff --git a/swarm/syncer/db_test.go b/swarm/syncer/db_test.go
new file mode 100644
index 0000000000..a9806f01d9
--- /dev/null
+++ b/swarm/syncer/db_test.go
@@ -0,0 +1,248 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package syncer
+
+import (
+ "context"
+ "encoding/binary"
+ "flag"
+ "fmt"
+ "io/ioutil"
+ "math/rand"
+ "os"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/swarm/network"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+ colorable "github.com/mattn/go-colorable"
+)
+
+var (
+ loglevel = flag.Int("loglevel", 3, "verbosity of logs")
+)
+
+func init() {
+ flag.Parse()
+ log.PrintOrigins(true)
+ log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))
+}
+
+// TestDBIteration tests the correct behaviour of DB, ie.
+// in the context of inserting n chunks
+// proof response model: after calling the sync function, the chunk hash appeared on
+// the receiptsC channel after a random delay
+// The test checks:
+// - sync function is called on chunks in order of insertion (FIFO)
+// - repeated calls are not before retryInterval time passed
+// - already synced chunks are not resynced
+// - if no more data inserted, the db is emptied shortly
+func TestDBIteration(t *testing.T) {
+ timeout := 30 * time.Second
+ chunkCnt := 10000
+
+ receiptsC := make(chan storage.Address)
+ chunksSentAt := make([]*time.Time, chunkCnt)
+
+ errc := make(chan error)
+ quit := make(chan struct{})
+ defer close(quit)
+ errf := func(s string, vals ...interface{}) {
+ select {
+ case errc <- fmt.Errorf(s, vals...):
+ case <-quit:
+ }
+ }
+
+ var max uint64 // the highest index sent so far
+ var complete int64 // number of chunks that got poc response
+ // sync function is not called concurrently, so max need no lock
+ // TODO: chunksSentAt array should use lock
+ syncf := func(chunk storage.Chunk) error {
+ cur := binary.BigEndian.Uint64(chunk.Address()[:8])
+ if cur > max+1 {
+ errf("incorrect order of chunks from db chunk #%d before #%d", cur, max+1)
+ }
+ now := time.Now()
+ if cur < max+1 {
+ sentAt := chunksSentAt[cur-1]
+ if sentAt == nil {
+ errf("resyncing already synced chunk #%d: %v", cur, sentAt)
+ return nil
+ }
+ if (*sentAt).Add(retryInterval).After(now) {
+ errf("resync chunk #%d too early", cur)
+ return nil
+ }
+ } else {
+ max = cur
+ }
+ chunksSentAt[cur-1] = &now
+ // this go routine mimics the chunk sync - poc response roundrtip
+ // with random delay (uniform within a fixed range)
+ go func() {
+ n := rand.Intn(1000)
+ delay := time.Duration(n+5) * time.Millisecond
+ ctx, cancel := context.WithTimeout(context.TODO(), delay)
+ defer cancel()
+ select {
+ case <-ctx.Done():
+ case <-quit:
+ return
+ }
+ receiptsC <- chunk.Address()
+ chunksSentAt[cur-1] = nil
+ delCnt := atomic.AddInt64(&complete, 1)
+ if int(delCnt) == chunkCnt {
+ close(errc)
+ return
+ }
+ }()
+ return nil
+ }
+
+ // initialise db, it starts all the go routines
+ dbpath, err := ioutil.TempDir(os.TempDir(), "syncertest")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.RemoveAll(dbpath)
+ db, err := NewDB(dbpath, nil, syncf, receiptsC, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer db.Close()
+
+ // feed fake chunks into the db, hashes encode the order so that
+ // it can be traced
+ go func() {
+ for i := 1; i <= chunkCnt; i++ {
+ addr := make([]byte, 32)
+ binary.BigEndian.PutUint64(addr, uint64(i))
+ c := &item{
+ Addr: addr,
+ }
+ db.Put(c)
+ }
+ }()
+
+ // wait on errc for errors on any thread or close if success
+ // otherwise time out
+ select {
+ case err := <-errc:
+ if err != nil {
+ t.Fatal(err)
+ }
+ case <-time.After(timeout):
+ t.Fatalf("timeout")
+ }
+
+ err = waitTillEmpty(db)
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+// TestDBIteration tests the correct behaviour of DB that while constantly inserting chunks
+//
+func TestDBCompleteMultipleSessions(t *testing.T) {
+ chunkCnt := 1000
+
+ receiptsC := make(chan storage.Address)
+ quit := make(chan struct{})
+ defer close(quit)
+ // sync function is not called concurrently, so max need no lock
+ // TODO: chunksSentAt array should use lock
+ sync := func(chunk storage.Chunk) error {
+
+ // this go routine mimics the chunk sync - poc response roundrtip
+ // with random delay (uniform within a fixed range)
+ go func() {
+ n := rand.Intn(1000)
+ delay := time.Duration(n+5) * time.Millisecond
+ ctx, cancel := context.WithTimeout(context.TODO(), delay)
+ defer cancel()
+ select {
+ case <-ctx.Done():
+ receiptsC <- chunk.Address()
+ case <-quit:
+ }
+
+ }()
+ return nil
+ }
+ // initialise db, it starts all the go routines
+ dbpath, err := ioutil.TempDir(os.TempDir(), "syncertest")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.RemoveAll(dbpath)
+ db, err := NewDB(dbpath, nil, sync, receiptsC, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // feed fake chunks into the db, hashes encode the order so that
+ // it can be traced
+ i := 1
+ sessionChunkCnt := 10
+ var round int
+ ticker := time.NewTicker(10 * time.Microsecond)
+ defer ticker.Stop()
+ for range ticker.C {
+
+ db.Put(&item{Addr: network.RandomAddr().OAddr})
+ i++
+ if i > chunkCnt {
+ break
+ }
+ if i > sessionChunkCnt {
+ round++
+ log.Warn("session ends", "round", round, "chunks", i, "unsynced", db.Size())
+ db.Close()
+ db, err = NewDB(dbpath, nil, sync, receiptsC, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ sessionChunkCnt += rand.Intn(100)
+ }
+ }
+ err = waitTillEmpty(db)
+ if err != nil {
+ t.Fatal(err)
+ }
+ db.Close()
+}
+
+func waitTillEmpty(db *DB) error {
+ checkticker := time.NewTicker(50 * time.Millisecond)
+ defer checkticker.Stop()
+ round := 0
+ for range checkticker.C {
+ size := db.Size()
+ if size == 0 {
+ break
+ }
+ if round > 50 {
+ return fmt.Errorf("timeout waiting for db size 0, got %v", size)
+ }
+ round++
+ }
+ return nil
+}
diff --git a/swarm/syncer/protocol.go b/swarm/syncer/protocol.go
new file mode 100644
index 0000000000..23511a0921
--- /dev/null
+++ b/swarm/syncer/protocol.go
@@ -0,0 +1,121 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package syncer
+
+import (
+ "context"
+ "crypto/rand"
+
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+// dispatcher makes sure newly stored chunks make it to the neighbourhood where they
+// can be retrieved, i.e. to nodes whose area of responsibility includes the chunks address
+// it gathers proof of custody responses validates them and signal the chunk is synced
+type dispatcher struct {
+ baseAddr storage.Address // base address to use in proximity calculation
+ sendChunkMsg func(*chunkMsg) error // function to send chunk msg
+ processReceipt func(storage.Address) error // function to process receipt for a chunk
+}
+
+// newDispatcher constructs a new node-wise dispatcher
+func newDispatcher(baseAddr storage.Address) *dispatcher {
+ return &dispatcher{
+ baseAddr: baseAddr,
+ }
+}
+
+// chunkMsg is the message construct to send chunks to their local neighbourhood
+type chunkMsg struct {
+ Addr []byte
+ Data []byte
+ Origin []byte
+ Nonce []byte
+}
+
+// sendChunk is called on incoming chunks that are to be synced
+func (s *dispatcher) sendChunk(ch storage.Chunk) error {
+ nonce := newNonce()
+ // TODO: proofs for the nonce should be generated and saved
+ msg := &chunkMsg{
+ Origin: s.baseAddr,
+ Addr: ch.Address()[:],
+ Data: ch.Data(),
+ Nonce: nonce,
+ }
+ return s.sendChunkMsg(msg)
+}
+
+// newNonce creates a random nonce;
+// even without POC it is important otherwise resending a chunk is deduplicated by pss
+func newNonce() []byte {
+ buf := make([]byte, 32)
+ t := 0
+ for t < len(buf) {
+ n, _ := rand.Read(buf[t:])
+ t += n
+ }
+ return buf
+}
+
+// receiptMsg is a statement of custody response to a nonce on a chunk
+// it is currently a notification only, contains no proof
+type receiptMsg struct {
+ Addr []byte
+ Nonce []byte
+}
+
+// handleReceipt is called by the pss dispatcher on proofTopic msgs
+// after processing the receipt, it calls the chunk address to receiptsC
+func (s *dispatcher) handleReceipt(msg *receiptMsg, p *p2p.Peer) error {
+ return s.processReceipt(msg.Addr)
+}
+
+// storer makes sure that chunks sent to them that fall within their area of responsibility
+// are stored and synced to their nearest neighbours and issue a receipt as a response
+// to the originator
+type storer struct {
+ chunkStore storage.ChunkStore // store to put chunks in, and retrieve them
+ sendReceiptMsg func(to []byte, r *receiptMsg) error
+}
+
+// newStorer constructs a new node-wise storer
+func newStorer(chunkStore storage.ChunkStore) *storer {
+ s := &storer{
+ chunkStore: chunkStore,
+ }
+ return s
+}
+
+// handleChunk is called by the pss dispatcher on chunkTopic msgs
+// only if the chunk falls in the nodes area of responsibility
+func (s *storer) handleChunk(msg *chunkMsg, p *p2p.Peer) error {
+ // TODO: double check if it falls in area of responsibility
+ ch := storage.NewChunk(msg.Addr, msg.Data)
+ err := s.chunkStore.Put(context.TODO(), ch)
+ if err != nil {
+ return err
+ }
+ // TODO: check if originator or relayer is a nearest neighbour then return
+ // otherwise send back receipt
+ r := &receiptMsg{
+ Addr: msg.Addr,
+ Nonce: msg.Nonce,
+ }
+ return s.sendReceiptMsg(msg.Origin, r)
+}
diff --git a/swarm/syncer/protocol_test.go b/swarm/syncer/protocol_test.go
new file mode 100644
index 0000000000..00ce04ba10
--- /dev/null
+++ b/swarm/syncer/protocol_test.go
@@ -0,0 +1,140 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package syncer
+
+import (
+ "bytes"
+ "context"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/enode"
+ "github.com/ethereum/go-ethereum/swarm/network"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+// TestDispatcherSendChunk tests if the dispatcher.sendChunk is called
+// a chunk message appears on the chunkC channel
+func TestDispatcherSendChunk(t *testing.T) {
+ baseAddr := network.RandomAddr().OAddr
+ s := newDispatcher(baseAddr)
+ timeout := time.NewTimer(100 * time.Millisecond)
+ var chmsg *chunkMsg
+ called := make(chan *chunkMsg)
+ s.sendChunkMsg = func(msg *chunkMsg) error {
+ called <- msg
+ return nil
+ }
+
+ chunk := storage.GenerateRandomChunk(100)
+ addr := chunk.Address()
+ chunkData := chunk.Data()
+ go s.sendChunk(chunk)
+ select {
+ case chmsg = <-called:
+ case <-timeout.C:
+ t.Fatal("timeout waiting for chunk message on channel")
+ }
+ if !bytes.Equal(chmsg.Addr, addr) {
+ t.Fatalf("expected chunk message address %v, got %v", addr, chmsg.Addr)
+ }
+ if !bytes.Equal(chmsg.Origin, baseAddr) {
+ t.Fatalf("expected Origin address %v, got %v", baseAddr, chmsg.Origin)
+ }
+ if !bytes.Equal(chmsg.Data, chunkData) {
+ t.Fatalf("expected chunk message data %v, got %v", chunkData, chmsg.Data)
+ }
+ if len(chmsg.Nonce) != 32 {
+ t.Fatalf("expected nonce to be 32 bytes long, got %v", len(chmsg.Nonce))
+ }
+}
+
+// TestDispatcherHandleReceipt tests that if handleReceipt is called with a receipt message
+// then processReceipt is called with the address
+func TestDispatcherHandleProof(t *testing.T) {
+ baseAddr := network.RandomAddr().OAddr
+ s := newDispatcher(baseAddr)
+ timeout := time.NewTimer(100 * time.Millisecond)
+ called := make(chan storage.Address)
+ s.processReceipt = func(a storage.Address) error {
+ called <- a
+ return nil
+ }
+
+ chunk := storage.GenerateRandomChunk(100)
+ addr := chunk.Address()
+ nonce := newNonce()
+ msg := &receiptMsg{addr, nonce}
+ peer := p2p.NewPeer(enode.ID{}, "", nil)
+ var next []byte
+ go s.handleReceipt(msg, peer)
+ select {
+ case next = <-called:
+ case <-timeout.C:
+ t.Fatal("timeout waiting for receipt address on channel")
+ }
+ if !bytes.Equal(next, addr) {
+ t.Fatalf("expected receipt address %v, got %v", addr, next)
+ }
+}
+
+// TestStorerHandleChunk that if storer.handleChunk is called then the
+// chunk gets stored and receipt is created that sendReceiptMsg is called with
+func TestStorerHandleChunk(t *testing.T) {
+ // set up storer
+ origin := network.RandomAddr().OAddr
+ chunkStore := storage.NewMapChunkStore()
+ s := newStorer(chunkStore)
+ timeout := time.NewTimer(100 * time.Millisecond)
+ called := make(chan *receiptMsg)
+ var destination []byte
+ s.sendReceiptMsg = func(to []byte, msg *receiptMsg) error {
+ called <- msg
+ destination = to
+ return nil
+ }
+ // create a chunk message and call handleChunk on it
+ chunk := storage.GenerateRandomChunk(100)
+ addr := chunk.Address()
+ data := chunk.Data()
+ peer := p2p.NewPeer(enode.ID{}, "", nil)
+ nonce := newNonce()
+ chmsg := &chunkMsg{
+ Origin: origin,
+ Addr: addr,
+ Data: data,
+ Nonce: nonce,
+ }
+ go s.handleChunk(chmsg, peer)
+
+ var r *receiptMsg
+ select {
+ case r = <-called:
+ case <-timeout.C:
+ t.Fatal("timeout waiting for chunk message on channel")
+ }
+ if _, err := chunkStore.Get(context.TODO(), addr); err != nil {
+ t.Fatalf("expected chunk with address %v to be stored in chunkStore", addr)
+ }
+ if !bytes.Equal(destination, origin) {
+ t.Fatalf("expected destination to equal origin %v, got %v", origin, destination)
+ }
+ if !bytes.Equal(r.Addr, addr) {
+ t.Fatalf("expected receipt msg address to be chunk address %v, got %v", addr, r.Addr)
+ }
+}
diff --git a/swarm/syncer/pubsub.go b/swarm/syncer/pubsub.go
new file mode 100644
index 0000000000..2ff30cca7c
--- /dev/null
+++ b/swarm/syncer/pubsub.go
@@ -0,0 +1,111 @@
+package syncer
+
+import (
+ "github.com/ethereum/go-ethereum/common/hexutil"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/rlp"
+ "github.com/ethereum/go-ethereum/swarm/pss"
+)
+
+//
+const (
+ pssChunkTopic = "SYNC" // pss topic for chunks
+ pssReceiptTopic = "STOC" // pss topic for statement of custody receipts
+)
+
+// PubSub is a Postal Service interface needed to send/receive chunks, send/receive proofs
+type PubSub interface {
+ Register(topic string, handler func(msg []byte, p *p2p.Peer) error)
+ Send(to []byte, topic string, msg []byte) error
+}
+
+// Pss implements the PubSub interface using pss
+type Pss struct {
+ pss *pss.Pss // pss
+ prox bool // determines if pss send should use neighbourhood addressing
+}
+
+// NewPss creates a new Pss
+func NewPss(p *pss.Pss, prox bool) *Pss {
+ return &Pss{
+ pss: p,
+ prox: prox,
+ }
+}
+
+// Register registers a handler
+func (p *Pss) Register(topic string, handler func(msg []byte, p *p2p.Peer) error) {
+ f := func(msg []byte, peer *p2p.Peer, _ bool, _ string) error {
+ return handler(msg, peer)
+ }
+ h := pss.NewHandler(f).WithRaw()
+ if p.prox {
+ h = h.WithProxBin()
+ }
+ pt := pss.BytesToTopic([]byte(topic))
+ p.pss.Register(&pt, h)
+}
+
+// Send sends a message using pss SendRaw
+func (p *Pss) Send(to []byte, topic string, msg []byte) error {
+ pt := pss.BytesToTopic([]byte(topic))
+ log.Warn("Send", "topic", topic, "to", label(to))
+ return p.pss.SendRaw(pss.PssAddress(to), pt, msg)
+}
+
+// withPubSub plugs in PubSub to the storer to receive chunks and sending receipts
+func (s *storer) withPubSub(ps PubSub) *storer {
+ // Registers handler on pssChunkTopic that deserialises chunkMsg and calls
+ // syncer's handleChunk function
+ ps.Register(pssChunkTopic, func(msg []byte, p *p2p.Peer) error {
+ var chmsg chunkMsg
+ err := rlp.DecodeBytes(msg, &chmsg)
+ if err != nil {
+ return err
+ }
+ log.Error("Handler", "chunk", label(chmsg.Addr), "origin", label(chmsg.Origin))
+ return s.handleChunk(&chmsg, p)
+ })
+
+ s.sendReceiptMsg = func(to []byte, r *receiptMsg) error {
+ msg, err := rlp.EncodeToBytes(r)
+ if err != nil {
+ return err
+ }
+ log.Error("send receipt", "addr", label(r.Addr), "to", label(to))
+ return ps.Send(to, pssReceiptTopic, msg)
+ }
+ return s
+}
+
+func label(b []byte) string {
+ return hexutil.Encode(b[:2])
+}
+
+func (s *dispatcher) withPubSub(ps PubSub) *dispatcher {
+ // Registers handler on pssProofTopic that deserialises proofMsg and calls
+ // syncer's handleProof function
+ ps.Register(pssReceiptTopic, func(msg []byte, p *p2p.Peer) error {
+ var prmsg receiptMsg
+ err := rlp.DecodeBytes(msg, &prmsg)
+ if err != nil {
+ return err
+ }
+ log.Error("Handler", "proof", label(prmsg.Addr), "self", label(s.baseAddr))
+ return s.handleReceipt(&prmsg, p)
+ })
+
+ // consumes outgoing chunk messages and sends them to their destination
+ // using neighbourhood addressing
+ s.sendChunkMsg = func(c *chunkMsg) error {
+ msg, err := rlp.EncodeToBytes(c)
+ if err != nil {
+ return err
+ }
+ log.Error("send chunk", "addr", label(c.Addr), "self", label(s.baseAddr))
+ return ps.Send(c.Addr[:], pssChunkTopic, msg)
+ }
+
+ return s
+}
diff --git a/swarm/syncer/pubsub_test.go b/swarm/syncer/pubsub_test.go
new file mode 100644
index 0000000000..0e80cdece9
--- /dev/null
+++ b/swarm/syncer/pubsub_test.go
@@ -0,0 +1,63 @@
+package syncer
+
+import (
+ "bytes"
+ "math/rand"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/enode"
+ "github.com/ethereum/go-ethereum/swarm/chunk"
+ "github.com/ethereum/go-ethereum/swarm/network"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+// loopback implements PubSub as a central subscription engine,
+// ie a msg sent is received by all handlers registered for the topic
+type loopback struct {
+ handlers map[string][]func(msg []byte, p *p2p.Peer) error
+}
+
+// Register subscribes to a topic with a handler
+func (lb *loopback) Register(topic string, handler func(msg []byte, p *p2p.Peer) error) {
+ lb.handlers[topic] = append(lb.handlers[topic], handler)
+}
+
+// Send publishes a msg with a topic
+func (lb *loopback) Send(to []byte, topic string, msg []byte) error {
+ p := p2p.NewPeer(enode.ID{}, "", nil)
+ for _, handler := range lb.handlers[topic] {
+ if err := handler(msg, p); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// tests how dispatcher of a pushsyncing node communicate with storers via PubSub
+func TestProtocolWithLoopbackPubSub(t *testing.T) {
+ chunkCnt := 100
+ lb := &loopback{make(map[string][]func(msg []byte, p *p2p.Peer) error)}
+ d := newDispatcher(network.RandomAddr().OAddr).withPubSub(lb)
+ receiptsC := make(chan storage.Address, 1)
+ d.processReceipt = func(a storage.Address) error {
+ receiptsC <- a
+ return nil
+ }
+ chunkStore := storage.NewMapChunkStore()
+ newStorer(chunkStore).withPubSub(lb)
+ timeout := time.NewTimer(100 * time.Millisecond)
+ for i := 0; i < chunkCnt; i++ {
+ ch := storage.GenerateRandomChunk(int64(rand.Intn(chunk.DefaultSize)))
+ d.sendChunk(ch)
+ select {
+ case <-timeout.C:
+ t.Fatalf("timeout")
+ case addr := <-receiptsC:
+ if !bytes.Equal(addr[:], ch.Address()[:]) {
+ t.Fatalf("wrong address synced")
+ }
+ }
+ }
+}
diff --git a/swarm/syncer/syncer.go b/swarm/syncer/syncer.go
new file mode 100644
index 0000000000..aeacfd6ee2
--- /dev/null
+++ b/swarm/syncer/syncer.go
@@ -0,0 +1,72 @@
+package syncer
+
+import (
+ "context"
+
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+// Syncer binds together
+// - syncdb
+// - protocol (dispatcher)
+// - pubsub transport layer
+type Syncer struct {
+ db *DB // sync db
+ addr []byte //
+}
+
+// New constructs a Syncer
+func New(dbpath string, baseAddr storage.Address, store storage.ChunkStore, ps PubSub, depthFunc func() uint) (*Syncer, error) {
+ d := newDispatcher(baseAddr).withPubSub(ps)
+ receiptsC := make(chan storage.Address)
+ db, err := NewDB(dbpath, store, d.sendChunk, receiptsC, depthFunc)
+ if err != nil {
+ return nil, err
+ }
+ d.processReceipt = func(addr storage.Address) error {
+ receiptsC <- addr
+ return nil
+ }
+ return &Syncer{db: db, addr: baseAddr[:]}, nil
+}
+
+// Close closes the syncer
+func (s *Syncer) Close() {
+ s.db.Close()
+}
+
+// Put puts the chunk to storage and inserts into sync index
+// currently chunkstore call is syncronous so it needs to
+// wrap dbstore
+func (s *Syncer) Put(tagname string, chunk storage.Chunk) {
+ it := &item{
+ Addr: chunk.Address(),
+ PO: uint(storage.Proximity(chunk.Address(), s.addr)),
+ Tag: tagname,
+ chunk: chunk,
+ state: SPLIT, // can be left explicit
+ }
+ s.db.tags.Inc(tagname, SPLIT)
+ // this put returns with error if this is a duplicate
+ err := s.db.chunkStore.Put(context.TODO(), chunk)
+ if err == errExists {
+ return
+ }
+ if err != nil {
+ log.Error("syncer: error storing chunk: %v", err)
+ return
+ }
+ s.db.Put(it)
+}
+
+// NewTag creates a new info object for a file/collection of chunks
+func (s *Syncer) NewTag(name string, total int) (*Tag, error) {
+ return s.db.tags.New(name, total)
+}
+
+// Status returns the number of chunks in a state tagged with tag
+func (s *Syncer) Status(name string, state State) (int, int) {
+ v, _ := s.db.tags.tags.Load(name)
+ return v.(*Tag).Status(state)
+}
diff --git a/swarm/syncer/syncer_test.go b/swarm/syncer/syncer_test.go
new file mode 100644
index 0000000000..cd10bc869b
--- /dev/null
+++ b/swarm/syncer/syncer_test.go
@@ -0,0 +1,417 @@
+package syncer
+
+import (
+ "context"
+ "fmt"
+ "io/ioutil"
+ "math/rand"
+ "os"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/node"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/enode"
+ "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
+ "github.com/ethereum/go-ethereum/swarm/chunk"
+ "github.com/ethereum/go-ethereum/swarm/network"
+ "github.com/ethereum/go-ethereum/swarm/network/simulation"
+ "github.com/ethereum/go-ethereum/swarm/network/stream"
+ "github.com/ethereum/go-ethereum/swarm/pot"
+ "github.com/ethereum/go-ethereum/swarm/pss"
+ "github.com/ethereum/go-ethereum/swarm/state"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+const neighbourhoodSize = 2
+
+// tests the syncer
+// tests how dispatcher of a push-syncing node interfaces with the sync db
+// communicate with storers via mock PubSub
+func TestSyncerWithLoopbackPubSub(t *testing.T) {
+ // mock pubsub messenger
+ lb := &loopback{make(map[string][]func(msg []byte, p *p2p.Peer) error)}
+
+ // initialise syncer
+ baseAddr := network.RandomAddr().OAddr
+ chunkStore := storage.NewMapChunkStore()
+ dbpath, err := ioutil.TempDir(os.TempDir(), "syncertest")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.RemoveAll(dbpath)
+ s, err := New(dbpath, baseAddr, chunkStore, lb, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer s.Close()
+
+ // add a client storer and hook it to loopback pubsub
+ storerChunkStore := storage.NewMapChunkStore()
+ newStorer(storerChunkStore).withPubSub(lb)
+
+ // fill up with chunks
+ chunkCnt := 100
+ for i := 0; i < chunkCnt; i++ {
+ ch := storage.GenerateRandomChunk(int64(rand.Intn(chunk.DefaultSize)))
+ s.Put("test", ch)
+ }
+
+ err = waitTillEmpty(s.db)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+}
+
+// pubSubOracle implements a fake comms protocol to serve as pubsub, that dispatches chunks to nodes
+// nearest neighbours
+type pubSubOracle struct {
+ pot *pot.Pot
+ pof pot.Pof
+ chunkHandlers map[string]func(msg []byte, p *p2p.Peer) error
+ proofHandlers map[string]func(msg []byte, p *p2p.Peer) error
+}
+
+func newPubSubOracle() *pubSubOracle {
+ return &pubSubOracle{
+ pof: pot.DefaultPof(256),
+ chunkHandlers: make(map[string]func(msg []byte, p *p2p.Peer) error),
+ proofHandlers: make(map[string]func(msg []byte, p *p2p.Peer) error),
+ }
+}
+
+func (p *pubSubOracle) Send(to []byte, topic string, msg []byte) error {
+ addr := storage.Address(to)
+ peer := p2p.NewPeer(enode.ID{}, "", nil)
+ if topic == pssReceiptTopic {
+ f, ok := p.proofHandlers[addr.Hex()]
+ if !ok {
+ return fmt.Errorf("cannot send to %v", addr.Hex())
+ }
+ f(msg, peer)
+ return nil
+ }
+ nns := p.getNNS(addr)
+ for _, nn := range nns {
+ f := p.chunkHandlers[storage.Address(nn).Hex()]
+ f(msg, peer)
+ }
+ return nil
+
+}
+
+func (p *pubSubOracle) getNNS(addr []byte) (peers [][]byte) {
+ if p.pot == nil {
+ p.initPOT()
+ }
+ n := 0
+ p.pot.EachNeighbour(addr, p.pof, func(v pot.Val, i int) bool {
+ peers = append(peers, v.([]byte))
+ n++
+ return n < neighbourhoodSize
+ })
+ return peers
+}
+
+func (p *pubSubOracle) initPOT() {
+ p.pot = pot.NewPot(nil, 0)
+ for k := range p.chunkHandlers {
+ addr := common.Hex2Bytes(k)
+ p.pot, _, _ = pot.Add(p.pot, addr, p.pof)
+ }
+}
+
+type pubsub struct {
+ *pubSubOracle
+ addr storage.Address
+}
+
+func (ps *pubsub) Register(topic string, handler func(msg []byte, p *p2p.Peer) error) {
+ if topic == pssReceiptTopic {
+ ps.pubSubOracle.proofHandlers[ps.addr.Hex()] = handler
+ return
+ }
+ ps.pubSubOracle.chunkHandlers[ps.addr.Hex()] = handler
+}
+
+func (p *pubSubOracle) new(b []byte) PubSub {
+ return &pubsub{pubSubOracle: p, addr: storage.Address(b)}
+}
+
+var (
+ bucketKeySyncer = simulation.BucketKey("syncer")
+)
+
+func upload(ctx context.Context, s *Syncer, tagname string, n int) (addrs []storage.Address, err error) {
+ tg, err := s.NewTag(tagname, n)
+ if err != nil {
+ return nil, err
+ }
+ for i := 0; i < n; i++ {
+ ch := storage.GenerateRandomChunk(int64(chunk.DefaultSize))
+ addrs = append(addrs, ch.Address())
+ s.Put(tagname, ch)
+ }
+ _ = tg
+ // err = tg.WaitTill(ctx, SYNCED)
+ time.Sleep(1 * time.Second)
+ if err != nil {
+ return nil, err
+ }
+ return addrs, nil
+}
+
+func download(ctx context.Context, s *Syncer, addrs []storage.Address) error {
+ errc := make(chan error)
+ for _, addr := range addrs {
+ go func(addr storage.Address) {
+ _, err := s.db.chunkStore.Get(ctx, addr)
+ select {
+ case errc <- err:
+ case <-ctx.Done():
+ }
+ }(addr)
+ }
+ i := 0
+ for err := range errc {
+ if err != nil {
+ return err
+ }
+ i++
+ if i == len(addrs) {
+ break
+ }
+ }
+ return nil
+}
+
+// tests syncing using simulation framework
+// - snapshot sets up a healthy kademlia
+// - pubSubOracle (offband delivery) is mocking transport layer for syncing protocol to the neighbourhood
+// - uses real streamer retrieval requests to download
+// - it performs several trials concurrently
+// - each trial an uploader and a downloader node is selected and uploader uploads a number of chunks
+// and after it is synced the downloader node is retrieving the content
+// there is no retries, if any of the downloads times out or the
+func TestSyncerWithPubSubOracle(t *testing.T) {
+ nodeCnt := 128
+ chunkCnt := 10
+ trials := 100
+
+ // offband syncing to nearest neighbourhood
+ psg := newPubSubOracle()
+ psSyncerF := func(addr []byte, _ *pss.Pss) PubSub {
+ return psg.new(addr)
+ }
+ err := testSyncerWithPubSub(nodeCnt, chunkCnt, trials, newServiceFunc(psSyncerF, nil))
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+// test syncer using pss
+func TestSyncerWithPss(t *testing.T) {
+ nodeCnt := 128
+ chunkCnt := 10
+ trials := 10
+ psSyncerF := func(_ []byte, p *pss.Pss) PubSub {
+ return NewPss(p, false)
+ }
+ psStorerF := func(_ []byte, p *pss.Pss) PubSub {
+ return NewPss(p, true)
+ }
+ err := testSyncerWithPubSub(nodeCnt, chunkCnt, trials, newServiceFunc(psSyncerF, psStorerF))
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+func testSyncerWithPubSub(nodeCnt, chunkCnt, trials int, sf simulation.ServiceFunc) error {
+ sim := simulation.New(map[string]simulation.ServiceFunc{
+ "streamer": sf,
+ })
+ defer sim.Close()
+
+ err := sim.UploadSnapshot(fmt.Sprintf("../network/stream/testing/snapshot_%d.json", nodeCnt))
+ if err != nil {
+ return err
+ }
+
+ choose2 := func(n int) (i, j int) {
+ i = rand.Intn(n)
+ j = rand.Intn(n - 1)
+ if j >= i {
+ j++
+ }
+ return
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
+ defer cancel()
+ errc := make(chan error)
+ result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
+ for i := 0; i < trials; i++ {
+ // if i%10 == 0 && i > 0 {
+ // time.Sleep(1000 * time.Millisecond)
+ // }
+ go func(i int) {
+ u, d := choose2(nodeCnt)
+ uid := sim.UpNodeIDs()[u]
+ val, _ := sim.NodeItem(uid, bucketKeySyncer)
+ syncer := val.(*Syncer)
+ syncer.db.depthC <- syncer.db.depthFunc()
+ did := sim.UpNodeIDs()[d]
+ tagname := fmt.Sprintf("tag-%v-%v-%d", label(uid[:]), label(did[:]), i)
+ log.Error("uploading", "peer", uid, "chunks", chunkCnt, "tagname", tagname)
+ what, err := upload(ctx, syncer, tagname, chunkCnt)
+ if err != nil {
+ select {
+ case errc <- err:
+ case <-ctx.Done():
+ return
+ }
+ return
+ }
+ log.Error("synced", "peer", uid, "chunks", chunkCnt, "tagname", tagname)
+ log.Error("downloading", "peer", did, "chunks", chunkCnt, "tagname", tagname)
+
+ val, _ = sim.NodeItem(did, bucketKeySyncer)
+ syncer = val.(*Syncer)
+ err = download(ctx, syncer, what)
+ select {
+ case errc <- err:
+ case <-ctx.Done():
+ }
+ log.Error("downloaded", "peer", did, "chunks", chunkCnt, "tagname", tagname)
+
+ }(i)
+ }
+ i := 0
+ for err := range errc {
+ if err != nil {
+ return err
+ }
+ i++
+ if i >= trials {
+ break
+ }
+ }
+ return nil
+ })
+
+ if result.Error != nil {
+ return fmt.Errorf("simulation error: %v", result.Error)
+ }
+ log.Error("PASS")
+ return nil
+}
+
+func newServiceFunc(psSyncer, psStorer func([]byte, *pss.Pss) PubSub) func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
+ return func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
+ n := ctx.Config.Node()
+ addr := network.NewAddr(n)
+ datadir, err := ioutil.TempDir(os.TempDir(), fmt.Sprintf("chunkstore-%s", n.ID().TerminalString()))
+ if err != nil {
+ return nil, nil, err
+ }
+ params := storage.NewDefaultLocalStoreParams()
+ params.ChunkDbPath = datadir
+ params.BaseKey = addr.Over()
+ localStore, err := storage.NewTestLocalStoreForAddr(params)
+ if err != nil {
+ os.RemoveAll(datadir)
+ return nil, nil, err
+ }
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ // pss
+ kadParams := network.NewKadParams()
+ kadParams.MinProxBinSize = 2
+ kad := network.NewKademlia(addr.Over(), kadParams)
+ privKey, err := crypto.GenerateKey()
+ pssp := pss.NewPssParams().WithPrivateKey(privKey)
+ ps, err := pss.NewPss(kad, pssp)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ // streamer
+ delivery := stream.NewDelivery(kad, netStore)
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
+
+ r := stream.NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &stream.RegistryOptions{
+ Syncing: stream.SyncingDisabled,
+ Retrieval: stream.RetrievalEnabled,
+ }, nil)
+
+ // set up syncer
+ dbpath, err := ioutil.TempDir(os.TempDir(), fmt.Sprintf("syncdb-%s", n.ID().TerminalString()))
+ if err != nil {
+ os.RemoveAll(datadir)
+ return nil, nil, err
+ }
+ defer os.RemoveAll(dbpath)
+ p := psSyncer(addr.OAddr, ps)
+ depthFunc := func() uint {
+ return uint(kad.NeighbourhoodDepth())
+ }
+ syn, err := New(dbpath, addr.OAddr, netStore, p, depthFunc)
+ if err != nil {
+ os.RemoveAll(datadir)
+ os.RemoveAll(dbpath)
+ return nil, nil, err
+ }
+ bucket.Store(bucketKeySyncer, syn)
+
+ // also work as a syncer storer client
+ if psStorer != nil {
+ p = psStorer(addr.OAddr, ps)
+ }
+ st := newStorer(netStore).withPubSub(p)
+ _ = st
+
+ cleanup = func() {
+ syn.Close()
+ netStore.Close()
+ os.RemoveAll(datadir)
+ os.RemoveAll(dbpath)
+ r.Close()
+ }
+
+ return &StreamerAndPss{r, ps}, cleanup, nil
+ }
+}
+
+// implements the node.Service interface
+type StreamerAndPss struct {
+ *stream.Registry
+ pss *pss.Pss
+}
+
+func (s *StreamerAndPss) Protocols() []p2p.Protocol {
+ return append(s.Registry.Protocols(), s.pss.Protocols()...)
+}
+
+func (s *StreamerAndPss) Start(srv *p2p.Server) error {
+ err := s.Registry.Start(srv)
+ if err != nil {
+ return err
+ }
+ return s.pss.Start(srv)
+}
+
+func (s *StreamerAndPss) Stop() error {
+ err := s.Registry.Stop()
+ if err != nil {
+ return err
+ }
+ return s.pss.Stop()
+}
diff --git a/swarm/syncer/tags.go b/swarm/syncer/tags.go
new file mode 100644
index 0000000000..2b2d66ae9e
--- /dev/null
+++ b/swarm/syncer/tags.go
@@ -0,0 +1,154 @@
+package syncer
+
+import (
+ "context"
+ "errors"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/ethereum/go-ethereum/log"
+)
+
+var (
+ errExists = errors.New("already exists")
+ errNoETA = errors.New("unable to calculate ETA")
+)
+
+// Tag represents info on the status of new chunks
+type Tag struct {
+ name string
+ total uint32 // total chunks belonging to a tag
+ split uint32 // number of chunks already processed by splitter for hashing
+ stored uint32 // number of chunks already stored locally
+ sent uint32 // number of chunks sent for push syncing
+ synced uint32 // number of chunks synced with proof
+ startedAt time.Time // tag started to calculate ETA
+ State chan State // channel to signal completion
+}
+
+// tags holds the tag infos indexed by name
+type tags struct {
+ tags *sync.Map
+}
+
+// NewTags creates a tags object
+func newTags() *tags {
+ return &tags{
+ &sync.Map{},
+ }
+}
+
+// New creates a new tag, stores it by the name and returns it
+// it returns an error if the tag with this name already exists
+func (ts *tags) New(s string, total int) (*Tag, error) {
+ t := &Tag{
+ name: s,
+ startedAt: time.Now(),
+ total: uint32(total),
+ State: make(chan State, 5),
+ }
+ _, loaded := ts.tags.LoadOrStore(s, t)
+ if loaded {
+ return nil, errExists
+ }
+ return t, nil
+}
+
+// Inc increments the count for a state
+func (t *Tag) Inc(state State) {
+ var v *uint32
+ switch state {
+ case SPLIT:
+ v = &t.split
+ case STORED:
+ v = &t.stored
+ case SENT:
+ v = &t.sent
+ case SYNCED:
+ v = &t.synced
+ }
+ n := atomic.AddUint32(v, 1)
+ if int(n) == t.GetTotal() {
+ t.State <- state
+ }
+}
+
+// Get returns the count for a state on a tag
+func (t *Tag) Get(state State) int {
+ var v *uint32
+ switch state {
+ case SPLIT:
+ v = &t.split
+ case STORED:
+ v = &t.stored
+ case SENT:
+ v = &t.sent
+ case SYNCED:
+ v = &t.synced
+ }
+ return int(atomic.LoadUint32(v))
+}
+
+// GetTotal returns the total count
+func (t *Tag) GetTotal() int {
+ return int(atomic.LoadUint32(&t.total))
+}
+
+// SetTotal sets total count to SPLIT count
+// is meant to be called when splitter finishes for input streams of unknown size
+func (t *Tag) SetTotal() int {
+ total := atomic.LoadUint32(&t.split)
+ atomic.StoreUint32(&t.total, total)
+ return int(total)
+}
+
+// Status returns the value of state and the total count
+func (t *Tag) Status(state State) (int, int) {
+ return t.Get(state), int(atomic.LoadUint32(&t.total))
+}
+
+// ETA returns the time of completion estimated based on time passed and rate of completion
+func (t *Tag) ETA(state State) (time.Time, error) {
+ cnt := t.Get(state)
+ total := t.GetTotal()
+ if cnt == 0 || total == 0 {
+ return time.Time{}, errNoETA
+ }
+ diff := time.Since(t.startedAt)
+ dur := time.Duration(total) * diff / time.Duration(cnt)
+ return t.startedAt.Add(dur), nil
+}
+
+// Inc increments the state count for a tag if tag is found
+func (ts *tags) Inc(s string, f State) {
+ t, ok := ts.tags.Load(s)
+ if !ok {
+ return
+ }
+ t.(*Tag).Inc(f)
+}
+
+// Get returns the state count for a tag
+func (ts *tags) Get(s string, f State) int {
+ t, _ := ts.tags.Load(s)
+ return t.(*Tag).Get(f)
+}
+
+// WaitTill blocks until count for the State reaches total cnt
+func (tg *Tag) WaitTill(ctx context.Context, s State) error {
+ ticker := time.NewTicker(1 * time.Second)
+ defer ticker.Stop()
+ for {
+ select {
+ case c := <-tg.State:
+ if c == s {
+ return nil
+ }
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-ticker.C:
+ log.Error("Status", "name", tg.name, "SENT", tg.Get(SENT), "SYNCED", tg.Get(SYNCED))
+ }
+ }
+}
diff --git a/swarm/syncer/tags_test.go b/swarm/syncer/tags_test.go
new file mode 100644
index 0000000000..e815da071a
--- /dev/null
+++ b/swarm/syncer/tags_test.go
@@ -0,0 +1,178 @@
+package syncer
+
+import (
+ "context"
+ "io/ioutil"
+ "math/rand"
+ "os"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/swarm/network"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+var (
+ allStates = []State{SPLIT, STORED, SENT, SYNCED}
+)
+
+// TestTagSingleIncrements tests if Inc increments the tag state value
+func TestTagSingleIncrements(t *testing.T) {
+ tg := &Tag{total: 10}
+ for _, f := range allStates {
+ tg.Inc(f)
+ if tg.Get(f) != 1 {
+ t.Fatalf("not incremented")
+ }
+ cnt, total := tg.Status(f)
+ if cnt != 1 {
+ t.Fatalf("expected count 1 for state %v, got %v", f, cnt)
+ }
+ if total != 10 {
+ t.Fatalf("expected total count %v for state %v, got %v", 10, f, cnt)
+ }
+ }
+}
+
+// tests ETA is precise
+func TestTagETA(t *testing.T) {
+ now := time.Now()
+ maxDiff := 100000 // 100 microsecond
+ tg := &Tag{total: 10, startedAt: now}
+ time.Sleep(100 * time.Millisecond)
+ tg.Inc(SPLIT)
+ eta, err := tg.ETA(SPLIT)
+ if err != nil {
+ t.Fatal(err)
+ }
+ diff := time.Until(eta) - 9*time.Since(now)
+ if int(diff) > maxDiff || int(diff) < -maxDiff {
+ t.Fatalf("ETA is not precise, got diff %v > .1ms", diff)
+ }
+}
+
+// TestTagConcurrentIncrements tests Inc calls concurrently
+func TestTagConcurrentIncrements(t *testing.T) {
+ tg := &Tag{}
+ n := 1000
+ wg := sync.WaitGroup{}
+ wg.Add(4 * n)
+ for _, f := range allStates {
+ go func(f State) {
+ for j := 0; j < n; j++ {
+ go func() {
+ tg.Inc(f)
+ wg.Done()
+ }()
+ }
+ }(f)
+ }
+ wg.Wait()
+ for _, f := range allStates {
+ v := tg.Get(f)
+ if v != n {
+ t.Fatalf("expected state %v to be %v, got %v", f, n, v)
+ }
+ }
+}
+
+// TestTagsMultipleConcurrentIncrements tests Inc calls concurrently
+func TestTagsMultipleConcurrentIncrements(t *testing.T) {
+ ts := newTags()
+ n := 100
+ wg := sync.WaitGroup{}
+ wg.Add(10 * 4 * n)
+ for i := 0; i < 10; i++ {
+ s := string([]byte{uint8(i)})
+ ts.New(s, n)
+ for _, f := range allStates {
+ go func(s string, f State) {
+ for j := 0; j < n; j++ {
+ go func() {
+ ts.Inc(s, f)
+ wg.Done()
+ }()
+ }
+ }(s, f)
+ }
+ }
+ wg.Wait()
+ for i := 0; i < 10; i++ {
+ s := string([]byte{uint8(i)})
+ for _, f := range allStates {
+ v := ts.Get(s, f)
+ if v != n {
+ t.Fatalf("expected tag %v state %v to be %v, got %v", s, f, n, v)
+ }
+ }
+ }
+}
+
+// tests the correct behaviour of tags while using the DB
+func TestDBWithTags(t *testing.T) {
+ names := []string{"1", "2", "3", "4"}
+ receiptsC := make(chan storage.Address)
+ quit := make(chan struct{})
+ defer close(quit)
+ // sync function is not called concurrently, so max need no lock
+ // TODO: chunksSentAt array should use lock
+ sync := func(chunk storage.Chunk) error {
+
+ // this go routine mimics the chunk sync - poc response roundrtip
+ // with random delay (uniform within a fixed range)
+ go func() {
+ n := rand.Intn(1000)
+ delay := time.Duration(n+5) * time.Millisecond
+ ctx, cancel := context.WithTimeout(context.TODO(), delay)
+ defer cancel()
+ select {
+ case <-ctx.Done():
+ receiptsC <- chunk.Address()
+ case <-quit:
+ }
+
+ }()
+ return nil
+ }
+ // initialise db, it starts all the go routines
+ dbpath, err := ioutil.TempDir(os.TempDir(), "syncertest")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.RemoveAll(dbpath)
+ db, err := NewDB(dbpath, nil, sync, receiptsC, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer db.Close()
+
+ // feed fake chunks into the db, hashes encode the order so that
+ // it can be traced
+ for i, name := range names {
+ total := i*100 + 100
+ db.tags.New(name, total)
+ go func(name string, total int) {
+ for j := 0; j < total; j++ {
+ db.Put(&item{Addr: network.RandomAddr().OAddr, Tag: name})
+ }
+ }(name, total)
+ }
+
+ err = waitTillEmpty(db)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ states := []State{STORED, SENT, SYNCED}
+ var cnt int
+ for i, name := range names {
+ total := i*100 + 100
+ for _, state := range states {
+ cnt = db.tags.Get(name, state)
+ if cnt != total {
+ t.Fatalf("expected tag %v state %v to count %v, got %v", name, state, total, cnt)
+ }
+ }
+ }
+}