From 9946671e973b467bcecb2421db7f9487fe64eb3d Mon Sep 17 00:00:00 2001 From: Hasyimi Bahrudin Date: Tue, 6 Aug 2019 13:17:18 +0800 Subject: [PATCH 01/19] ledger: add failing test --- accounts.go | 7 +- ledger.go | 69 +++++++++-- nops_test.go | 52 +++++++++ snowball.go | 3 +- testutil.go | 323 +++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 440 insertions(+), 14 deletions(-) create mode 100644 nops_test.go create mode 100644 testutil.go diff --git a/accounts.go b/accounts.go index 7b65af0a..6e656c2b 100644 --- a/accounts.go +++ b/accounts.go @@ -21,13 +21,14 @@ package wavelet import ( "context" - "github.com/perlin-network/wavelet/avl" - "github.com/perlin-network/wavelet/store" - "github.com/pkg/errors" "sync" "sync/atomic" "time" "unsafe" + + "github.com/perlin-network/wavelet/avl" + "github.com/perlin-network/wavelet/store" + "github.com/pkg/errors" ) type Accounts struct { diff --git a/ledger.go b/ledger.go index 9f56ec31..c2da1e78 100644 --- a/ledger.go +++ b/ledger.go @@ -25,6 +25,11 @@ import ( "encoding/binary" "encoding/hex" "fmt" + "math/rand" + "strings" + "sync" + "time" + "github.com/perlin-network/noise" "github.com/perlin-network/noise/skademlia" "github.com/perlin-network/wavelet/avl" @@ -36,10 +41,6 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/peer" - "math/rand" - "strings" - "sync" - "time" ) type Ledger struct { @@ -68,6 +69,9 @@ type Ledger struct { cacheChunks *LRU sendQuota chan struct{} + + finalizeCh chan struct{} + finalizeChLock sync.RWMutex } func NewLedger(kv store.KV, client *skademlia.Client, genesis *string) *Ledger { @@ -77,7 +81,9 @@ func NewLedger(kv store.KV, client *skademlia.Client, genesis *string) *Ledger { indexer := NewIndexer() accounts := NewAccounts(kv) - go accounts.GC(context.Background()) + + // TODO: disable GC only for test because it's causing test to panic + // go accounts.GC(context.Background()) rounds, err := NewRounds(kv, sys.PruningLimit) @@ -151,7 +157,8 @@ func (l *Ledger) AddTransaction(tx Transaction) error { if err != nil && errors.Cause(err) != ErrAlreadyExists { if !strings.Contains(errors.Cause(err).Error(), "transaction has no parents") { - fmt.Println(err) + logger := log.Node() + logger.Err(err).Msg("failed to add transaction") } return err } @@ -271,6 +278,36 @@ func (l *Ledger) Snapshot() *avl.Tree { return l.accounts.Snapshot() } +func (l *Ledger) BroadcastingNops() bool { + l.broadcastNopsLock.Lock() + broadcastNops := l.broadcastNops + defer l.broadcastNopsLock.Unlock() + + return broadcastNops +} + +// WaitForConsensus blocks until the ledger reaches consensus. +// It returns false if it took longer than the timeout duration. +func (l *Ledger) WaitForConsensus(timeout time.Duration) bool { + l.finalizeChLock.Lock() + ch := make(chan struct{}) + defer close(ch) + + l.finalizeCh = ch + l.finalizeChLock.Unlock() + + timer := time.NewTimer(timeout) + defer timer.Stop() + + select { + case <-ch: + return true + + case <-timer.C: + return false + } +} + // BroadcastNop has the node send a nop transaction should they have sufficient // balance available. They are broadcasted if no other transaction that is not a nop transaction // is not broadcasted by the node after 500 milliseconds. These conditions only apply so long as @@ -376,7 +413,9 @@ func (l *Ledger) PullMissingTransactions() { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) batch, err := client.DownloadTx(ctx, req) if err != nil { - fmt.Println("failed to download missing transactions:", err) + logger := log.Node() + logger.Err(err).Msg("failed to download missing transactions") + cancel() continue } @@ -387,12 +426,18 @@ func (l *Ledger) PullMissingTransactions() { for _, buf := range batch.Transactions { tx, err := UnmarshalTransaction(bytes.NewReader(buf)) if err != nil { - fmt.Printf("error unmarshaling downloaded tx [%v]: %+v", err, tx) + logger := log.Node() + logger.Err(err). + Hex("tx_id", tx.ID[:]). + Msg("error unmarshaling downloaded tx") continue } if err := l.AddTransaction(tx); err != nil && errors.Cause(err) != ErrMissingParents { - fmt.Printf("error adding downloaded tx to graph [%v]: %+v\n", err, tx) + logger := log.Node() + logger.Err(err). + Hex("tx_id", tx.ID[:]). + Msg("error adding downloaded tx to graph") continue } @@ -688,6 +733,12 @@ FINALIZE_ROUNDS: Msg("Finalized consensus round, and initialized a new round.") //go ExportGraphDOT(finalized, l.graph) + + l.finalizeChLock.RLock() + if l.finalizeCh != nil { + l.finalizeCh <- struct{}{} + } + l.finalizeChLock.RUnlock() } } diff --git a/nops_test.go b/nops_test.go new file mode 100644 index 00000000..25d09874 --- /dev/null +++ b/nops_test.go @@ -0,0 +1,52 @@ +package wavelet + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestLedger_TransactionThroughput(t *testing.T) { + testnet := NewTestNetwork(t) + defer testnet.Cleanup() + + for i := 0; i < 20; i++ { + testnet.AddNode(t, 0) + } + + alice := testnet.AddNode(t, 1000000) + bob := testnet.AddNode(t, 0) + + txs := make([]Transaction, 1000) + var err error + + fmt.Println("adding transactions...") + for i := 0; i < len(txs); i++ { + txs[i], err = alice.Pay(bob, 1) + assert.NoError(t, err) + } + + timeout := time.NewTimer(time.Second * 60) + for { + select { + case <-timeout.C: + t.Fatal("timed out before all transactions are applied") + + case <-alice.WaitForConsensus(): + var appliedCount int + for _, tx := range txs { + if alice.Applied(tx) { + appliedCount++ + } + } + + fmt.Printf("%d/%d tx applied\n", appliedCount, len(txs)) + + if appliedCount == len(txs) { + return + } + } + } +} diff --git a/snowball.go b/snowball.go index 6f9e2eaa..829d365e 100644 --- a/snowball.go +++ b/snowball.go @@ -20,7 +20,6 @@ package wavelet import ( - "fmt" "sync" ) @@ -112,7 +111,7 @@ func (s *Snowball) Tick(round *Round) { if s.lastID != round.ID { // Handle termination case. if s.lastID != ZeroRoundID { - fmt.Printf("Snowball (%s) liveness fault: Last ID is %x with count %d, and new ID is %x.\n", s.name, s.lastID, s.count, round.ID) + //fmt.Printf("Snowball (%s) liveness fault: Last ID is %x with count %d, and new ID is %x.\n", s.name, s.lastID, s.count, round.ID) } s.lastID = round.ID diff --git a/testutil.go b/testutil.go new file mode 100644 index 00000000..7b1d4637 --- /dev/null +++ b/testutil.go @@ -0,0 +1,323 @@ +package wavelet + +import ( + "encoding/hex" + "fmt" + "net" + "strconv" + "sync" + "testing" + "time" + + "github.com/perlin-network/noise" + "github.com/perlin-network/noise/cipher" + "github.com/perlin-network/noise/edwards25519" + "github.com/perlin-network/noise/handshake" + "github.com/perlin-network/noise/skademlia" + "github.com/perlin-network/wavelet/store" + "github.com/perlin-network/wavelet/sys" + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" +) + +type TestNetwork struct { + faucet *TestLedger + nodes []*TestLedger +} + +func NewTestNetwork(t testing.TB) *TestNetwork { + return &TestNetwork{ + faucet: NewTestFaucet(t), + nodes: []*TestLedger{}, + } +} + +func (n *TestNetwork) Cleanup() { + for _, node := range n.nodes { + node.Cleanup() + } + n.faucet.Cleanup() +} + +func (n *TestNetwork) AddNode(t testing.TB, startingBalance uint64) *TestLedger { + node := NewTestLedger(t, TestLedgerConfig{ + Peers: []string{n.faucet.Addr()}, + }) + n.nodes = append(n.nodes, node) + + if startingBalance > 0 { + _, err := n.faucet.Pay(node, startingBalance) + assert.NoError(t, err) + } + + return node +} + +func (n *TestNetwork) WaitForConsensus(t testing.TB) { + all := append(n.nodes, n.faucet) + TestWaitForConsensus(t, time.Second*30, all) +} + +type TestLedger struct { + ledger *Ledger + server *grpc.Server + addr string + kv store.KV + kvCleanup func() + finalized chan struct{} + stopped chan struct{} +} + +type TestLedgerConfig struct { + Wallet string + Peers []string +} + +func defaultConfig(t testing.TB) *TestLedgerConfig { + return &TestLedgerConfig{} +} + +// NewTestFaucet returns an account with a lot of PERLs. +func NewTestFaucet(t testing.TB) *TestLedger { + return NewTestLedger(t, TestLedgerConfig{ + Wallet: "87a6813c3b4cf534b6ae82db9b1409fa7dbd5c13dba5858970b56084c4a930eb400056ee68a7cc2695222df05ea76875bc27ec6e61e8e62317c336157019c405", + }) +} + +func NewTestLedger(t testing.TB, cfg TestLedgerConfig) *TestLedger { + keys := loadKeys(t, cfg.Wallet) + + ln, err := net.Listen("tcp", ":0") + assert.NoError(t, err) + + addr := net.JoinHostPort("127.0.0.1", strconv.Itoa(ln.Addr().(*net.TCPAddr).Port)) + + client := skademlia.NewClient(addr, keys, skademlia.WithC1(sys.SKademliaC1), skademlia.WithC2(sys.SKademliaC2)) + client.SetCredentials(noise.NewCredentials(addr, handshake.NewECDH(), cipher.NewAEAD(), client.Protocol())) + + kv, cleanup := store.NewTestKV(t, "inmem", "db") + ledger := NewLedger(kv, client, nil) + server := client.Listen() + RegisterWaveletServer(server, ledger.Protocol()) + + stopped := make(chan struct{}) + go func() { + defer close(stopped) + if err := server.Serve(ln); err != nil && err != grpc.ErrServerStopped { + t.Fatal(err) + } + }() + + for _, addr := range cfg.Peers { + if _, err := client.Dial(addr); err != nil { + t.Fatal(err) + } + } + + client.Bootstrap() + + return &TestLedger{ + ledger: ledger, + server: server, + addr: addr, + kv: kv, + kvCleanup: cleanup, + stopped: stopped, + } +} + +func (l *TestLedger) Cleanup() { + l.server.GracefulStop() + <-l.stopped + + //l.kvCleanup() +} + +func (l *TestLedger) Addr() string { + return l.addr +} + +func (l *TestLedger) PublicKey() AccountID { + keys := l.ledger.client.Keys() + return keys.PublicKey() +} + +func (l *TestLedger) Balance() uint64 { + snapshot := l.ledger.Snapshot() + balance, _ := ReadAccountBalance(snapshot, l.PublicKey()) + return balance +} + +func (l *TestLedger) BalanceOfAccount(node *TestLedger) uint64 { + snapshot := l.ledger.Snapshot() + balance, _ := ReadAccountBalance(snapshot, node.PublicKey()) + return balance +} + +func (l *TestLedger) Stake() uint64 { + snapshot := l.ledger.Snapshot() + stake, _ := ReadAccountStake(snapshot, l.PublicKey()) + return stake +} + +func (l *TestLedger) StakeOfAccount(node *TestLedger) uint64 { + snapshot := l.ledger.Snapshot() + balance, _ := ReadAccountStake(snapshot, node.PublicKey()) + return balance +} + +func (l *TestLedger) Reward() uint64 { + snapshot := l.ledger.Snapshot() + reward, _ := ReadAccountReward(snapshot, l.PublicKey()) + return reward +} + +func (l *TestLedger) WaitForConsensus() <-chan bool { + ch := make(chan bool) + go func() { + ch <- l.ledger.WaitForConsensus(time.Second * 2) + }() + + return ch +} + +func (l *TestLedger) Nop() (Transaction, error) { + keys := l.ledger.client.Keys() + tx := AttachSenderToTransaction( + keys, + NewTransaction(keys, sys.TagNop, nil), + l.ledger.Graph().FindEligibleParents()...) + + err := l.ledger.AddTransaction(tx) + return tx, err +} + +func (l *TestLedger) Pay(to *TestLedger, amount uint64) (Transaction, error) { + payload := Transfer{ + Recipient: to.PublicKey(), + Amount: amount, + } + + keys := l.ledger.client.Keys() + tx := AttachSenderToTransaction( + keys, + NewTransaction(keys, sys.TagTransfer, payload.Marshal()), + l.ledger.Graph().FindEligibleParents()...) + + err := l.ledger.AddTransaction(tx) + return tx, err +} + +func (l *TestLedger) PlaceStake(amount uint64) (Transaction, error) { + payload := Stake{ + Opcode: sys.PlaceStake, + Amount: amount, + } + + keys := l.ledger.client.Keys() + tx := AttachSenderToTransaction( + keys, + NewTransaction(keys, sys.TagStake, payload.Marshal()), + l.ledger.Graph().FindEligibleParents()...) + + err := l.ledger.AddTransaction(tx) + return tx, err +} + +func (l *TestLedger) WithdrawStake(amount uint64) (Transaction, error) { + payload := Stake{ + Opcode: sys.WithdrawStake, + Amount: amount, + } + + keys := l.ledger.client.Keys() + tx := AttachSenderToTransaction( + keys, + NewTransaction(keys, sys.TagStake, payload.Marshal()), + l.ledger.Graph().FindEligibleParents()...) + + err := l.ledger.AddTransaction(tx) + return tx, err +} + +func (l *TestLedger) WithdrawReward(amount uint64) (Transaction, error) { + payload := Stake{ + Opcode: sys.WithdrawReward, + Amount: amount, + } + + keys := l.ledger.client.Keys() + tx := AttachSenderToTransaction( + keys, + NewTransaction(keys, sys.TagStake, payload.Marshal()), + l.ledger.Graph().FindEligibleParents()...) + + err := l.ledger.AddTransaction(tx) + return tx, err +} + +func (l *TestLedger) FindTransaction(t testing.TB, id TransactionID) *Transaction { + return l.ledger.Graph().FindTransaction(id) +} + +func (l *TestLedger) Applied(tx Transaction) bool { + return tx.Depth <= l.ledger.graph.RootDepth() +} + +func TestWaitForConsensus(t testing.TB, timeout time.Duration, ledgers []*TestLedger) { + var wg sync.WaitGroup + for _, l := range ledgers { + wg.Add(1) + go func(ledger *TestLedger) { + defer wg.Done() + ledger.WaitForConsensus() + }(l) + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + timer := time.NewTimer(timeout) + select { + case <-done: + return + + case <-timer.C: + t.Fatal("consensus round took too long") + } +} + +// loadKeys returns a keypair from a wallet string, or generates a new one +// if no wallet is provided. +func loadKeys(t testing.TB, wallet string) *skademlia.Keypair { + // Generate a keypair if wallet is empty + if wallet == "" { + keys, err := skademlia.NewKeys(sys.SKademliaC1, sys.SKademliaC2) + assert.NoError(t, err) + return keys + } + + if len(wallet) != hex.EncodedLen(edwards25519.SizePrivateKey) { + t.Fatal(fmt.Errorf("private key is not of the right length")) + } + + var privateKey edwards25519.PrivateKey + n, err := hex.Decode(privateKey[:], []byte(wallet)) + if err != nil { + t.Fatal(err) + } + + if n != edwards25519.SizePrivateKey { + t.Fatal(fmt.Errorf("private key is not of the right length")) + } + + keys, err := skademlia.LoadKeys(privateKey, sys.SKademliaC1, sys.SKademliaC2) + if err != nil { + t.Fatal(err) + } + + return keys +} From 215145b58002cee31db93d88f055f4c2c891fd56 Mon Sep 17 00:00:00 2001 From: Hasyimi Bahrudin Date: Tue, 6 Aug 2019 14:44:50 +0800 Subject: [PATCH 02/19] ledger: stop nop broadcast if depth > last added tx depth --- ledger.go | 24 +++++++++++++++--------- nops_test.go | 3 +++ testutil.go | 2 +- 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/ledger.go b/ledger.go index c2da1e78..2294f5c5 100644 --- a/ledger.go +++ b/ledger.go @@ -58,9 +58,10 @@ type Ledger struct { consensus sync.WaitGroup - broadcastNops bool - broadcastNopsDelay time.Time - broadcastNopsLock sync.Mutex + broadcastNops bool + broadcastNopsMaxDepth uint64 + broadcastNopsDelay time.Time + broadcastNopsLock sync.Mutex sync chan struct{} syncVotes chan vote @@ -169,12 +170,13 @@ func (l *Ledger) AddTransaction(tx Transaction) error { l.gossiper.Push(tx) l.broadcastNopsLock.Lock() - if tx.Tag != sys.TagNop { + if tx.Tag != sys.TagNop && tx.Sender == l.client.Keys().PublicKey() { + l.broadcastNops = true l.broadcastNopsDelay = time.Now() - } - if tx.Sender == l.client.Keys().PublicKey() && l.finalizer.Preferred() == nil { - l.broadcastNops = true + if tx.Depth > l.broadcastNopsMaxDepth { + l.broadcastNopsMaxDepth = tx.Depth + } } l.broadcastNopsLock.Unlock() } @@ -278,7 +280,7 @@ func (l *Ledger) Snapshot() *avl.Tree { return l.accounts.Snapshot() } -func (l *Ledger) BroadcastingNops() bool { +func (l *Ledger) BroadcastingNop() bool { l.broadcastNopsLock.Lock() broadcastNops := l.broadcastNops defer l.broadcastNopsLock.Unlock() @@ -519,8 +521,12 @@ FINALIZE_ROUNDS: continue FINALIZE_ROUNDS } + // Only stop broadcasting nops if the most recently added transaction + // has been applied l.broadcastNopsLock.Lock() - l.broadcastNops = false + if l.broadcastNops && l.broadcastNopsMaxDepth <= l.graph.RootDepth() { + l.broadcastNops = false + } l.broadcastNopsLock.Unlock() workerChan := make(chan *grpc.ClientConn, 16) diff --git a/nops_test.go b/nops_test.go index 25d09874..7d113325 100644 --- a/nops_test.go +++ b/nops_test.go @@ -44,6 +44,9 @@ func TestLedger_TransactionThroughput(t *testing.T) { fmt.Printf("%d/%d tx applied\n", appliedCount, len(txs)) + assert.True(t, alice.ledger.BroadcastingNop(), + "node should not stop broadcasting nop while there are unapplied tx") + if appliedCount == len(txs) { return } diff --git a/testutil.go b/testutil.go index 7b1d4637..91f3f5c1 100644 --- a/testutil.go +++ b/testutil.go @@ -261,7 +261,7 @@ func (l *TestLedger) FindTransaction(t testing.TB, id TransactionID) *Transactio } func (l *TestLedger) Applied(tx Transaction) bool { - return tx.Depth <= l.ledger.graph.RootDepth() + return tx.Depth <= l.ledger.Graph().RootDepth() } func TestWaitForConsensus(t testing.TB, timeout time.Duration, ledgers []*TestLedger) { From 38e2e0849e8b2d32b92ea1846bab8fc761cd0f75 Mon Sep 17 00:00:00 2001 From: Hasyimi Bahrudin Date: Tue, 6 Aug 2019 14:58:37 +0800 Subject: [PATCH 03/19] ledger: wait for faucet in test --- nops_test.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/nops_test.go b/nops_test.go index 7d113325..91159159 100644 --- a/nops_test.go +++ b/nops_test.go @@ -19,6 +19,13 @@ func TestLedger_TransactionThroughput(t *testing.T) { alice := testnet.AddNode(t, 1000000) bob := testnet.AddNode(t, 0) + // Wait for alice to receive her PERL from the faucet + for <-alice.WaitForConsensus() { + if alice.Balance() > 0 { + break + } + } + txs := make([]Transaction, 1000) var err error From d8d424f2e09e4e7370d1daee581f5cb7b37c3121 Mon Sep 17 00:00:00 2001 From: Hasyimi Bahrudin Date: Tue, 6 Aug 2019 15:06:19 +0800 Subject: [PATCH 04/19] ledger: fix panic due to closed channel --- ledger.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/ledger.go b/ledger.go index 2294f5c5..3bc17dcd 100644 --- a/ledger.go +++ b/ledger.go @@ -293,8 +293,6 @@ func (l *Ledger) BroadcastingNop() bool { func (l *Ledger) WaitForConsensus(timeout time.Duration) bool { l.finalizeChLock.Lock() ch := make(chan struct{}) - defer close(ch) - l.finalizeCh = ch l.finalizeChLock.Unlock() @@ -306,6 +304,9 @@ func (l *Ledger) WaitForConsensus(timeout time.Duration) bool { return true case <-timer.C: + l.finalizeChLock.Lock() + l.finalizeCh = nil + l.finalizeChLock.Unlock() return false } } From 09d66bb8f6cc4174060d782417712be01318677d Mon Sep 17 00:00:00 2001 From: Hasyimi Bahrudin Date: Tue, 6 Aug 2019 15:28:38 +0800 Subject: [PATCH 05/19] ledger: check if nop broadcast is stopped in test --- nops_test.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/nops_test.go b/nops_test.go index 91159159..105e40f4 100644 --- a/nops_test.go +++ b/nops_test.go @@ -51,10 +51,14 @@ func TestLedger_TransactionThroughput(t *testing.T) { fmt.Printf("%d/%d tx applied\n", appliedCount, len(txs)) - assert.True(t, alice.ledger.BroadcastingNop(), - "node should not stop broadcasting nop while there are unapplied tx") + if appliedCount < len(txs) { + assert.True(t, alice.ledger.BroadcastingNop(), + "node should not stop broadcasting nop while there are unapplied tx") + } - if appliedCount == len(txs) { + // The test is successful if all tx are applied, + // and nop broadcasting is stopped once all tx are applied + if appliedCount == len(txs) && !alice.ledger.BroadcastingNop() { return } } From e8794adeba4c9a1815361b9fefda153448812b63 Mon Sep 17 00:00:00 2001 From: Hasyimi Bahrudin Date: Tue, 6 Aug 2019 15:59:02 +0800 Subject: [PATCH 06/19] ledger: increase consensus timeout in test --- testutil.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testutil.go b/testutil.go index 91f3f5c1..056eb377 100644 --- a/testutil.go +++ b/testutil.go @@ -175,7 +175,7 @@ func (l *TestLedger) Reward() uint64 { func (l *TestLedger) WaitForConsensus() <-chan bool { ch := make(chan bool) go func() { - ch <- l.ledger.WaitForConsensus(time.Second * 2) + ch <- l.ledger.WaitForConsensus(time.Second * 10) }() return ch From c1ab853c41f6b27d826b54645ab91a8051cc6287 Mon Sep 17 00:00:00 2001 From: Hasyimi Bahrudin Date: Tue, 6 Aug 2019 16:02:49 +0800 Subject: [PATCH 07/19] ledger: rename test and add comments --- nops_test.go => ledger_test.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) rename nops_test.go => ledger_test.go (80%) diff --git a/nops_test.go b/ledger_test.go similarity index 80% rename from nops_test.go rename to ledger_test.go index 105e40f4..f25b8ee6 100644 --- a/nops_test.go +++ b/ledger_test.go @@ -8,7 +8,14 @@ import ( "github.com/stretchr/testify/assert" ) -func TestLedger_TransactionThroughput(t *testing.T) { +// TestLedger_BroadcastNop checks that: +// +// * The ledger will keep broadcasting nop tx as long +// as there are unapplied tx (latestTxDepth <= rootDepth). +// +// * The ledger will stop broadcasting nop once there +// are no more unapplied tx. +func TestLedger_BroadcastNop(t *testing.T) { testnet := NewTestNetwork(t) defer testnet.Cleanup() @@ -26,6 +33,7 @@ func TestLedger_TransactionThroughput(t *testing.T) { } } + // Add lots of transactions txs := make([]Transaction, 1000) var err error From 975e2685d006bf23e13e0f9425e9ef5196243ca7 Mon Sep 17 00:00:00 2001 From: Hasyimi Bahrudin Date: Tue, 6 Aug 2019 16:23:34 +0800 Subject: [PATCH 08/19] ledger: disable GC when running tests --- ledger.go | 15 +++++++++++++-- testutil.go | 2 +- utils_test.go | 2 +- 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/ledger.go b/ledger.go index 3bc17dcd..69ef314d 100644 --- a/ledger.go +++ b/ledger.go @@ -28,6 +28,7 @@ import ( "math/rand" "strings" "sync" + "testing" "time" "github.com/perlin-network/noise" @@ -76,6 +77,15 @@ type Ledger struct { } func NewLedger(kv store.KV, client *skademlia.Client, genesis *string) *Ledger { + return newLedger(kv, client, genesis, true) +} + +// NewLedgerWithoutGC should only be used for testing, as it disables GC. +func NewLedgerWithoutGC(t testing.TB, kv store.KV, client *skademlia.Client) *Ledger { + return newLedger(kv, client, nil, false) +} + +func newLedger(kv store.KV, client *skademlia.Client, genesis *string, gc bool) *Ledger { logger := log.Node() metrics := NewMetrics(context.TODO()) @@ -83,8 +93,9 @@ func NewLedger(kv store.KV, client *skademlia.Client, genesis *string) *Ledger { accounts := NewAccounts(kv) - // TODO: disable GC only for test because it's causing test to panic - // go accounts.GC(context.Background()) + if gc { + go accounts.GC(context.Background()) + } rounds, err := NewRounds(kv, sys.PruningLimit) diff --git a/testutil.go b/testutil.go index 056eb377..27f8e6ec 100644 --- a/testutil.go +++ b/testutil.go @@ -96,7 +96,7 @@ func NewTestLedger(t testing.TB, cfg TestLedgerConfig) *TestLedger { client.SetCredentials(noise.NewCredentials(addr, handshake.NewECDH(), cipher.NewAEAD(), client.Protocol())) kv, cleanup := store.NewTestKV(t, "inmem", "db") - ledger := NewLedger(kv, client, nil) + ledger := NewLedgerWithoutGC(t, kv, client) server := client.Listen() RegisterWaveletServer(server, ledger.Protocol()) diff --git a/utils_test.go b/utils_test.go index 4d0d8a30..e789d0be 100644 --- a/utils_test.go +++ b/utils_test.go @@ -91,7 +91,7 @@ func newNode(t *testing.T) (*skademlia.Client, string, func()) { client.SetCredentials(noise.NewCredentials(addr, handshake.NewECDH(), cipher.NewAEAD(), client.Protocol())) kv, cleanup := store.NewTestKV(t, "inmem", "db") - ledger := NewLedger(kv, client, nil) + ledger := NewLedgerWithoutGC(t, kv, client) server := client.Listen() RegisterWaveletServer(server, ledger.Protocol()) From e9527326c7086e09a36a9b9db74057be22687cf0 Mon Sep 17 00:00:00 2001 From: Hasyimi Bahrudin Date: Tue, 6 Aug 2019 16:28:36 +0800 Subject: [PATCH 09/19] ledger, snowball: revert test code --- ledger.go | 17 ++++------------- snowball.go | 3 ++- 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/ledger.go b/ledger.go index 69ef314d..236b31db 100644 --- a/ledger.go +++ b/ledger.go @@ -169,8 +169,7 @@ func (l *Ledger) AddTransaction(tx Transaction) error { if err != nil && errors.Cause(err) != ErrAlreadyExists { if !strings.Contains(errors.Cause(err).Error(), "transaction has no parents") { - logger := log.Node() - logger.Err(err).Msg("failed to add transaction") + fmt.Println(err) } return err } @@ -427,9 +426,7 @@ func (l *Ledger) PullMissingTransactions() { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) batch, err := client.DownloadTx(ctx, req) if err != nil { - logger := log.Node() - logger.Err(err).Msg("failed to download missing transactions") - + fmt.Println("failed to download missing transactions:", err) cancel() continue } @@ -440,18 +437,12 @@ func (l *Ledger) PullMissingTransactions() { for _, buf := range batch.Transactions { tx, err := UnmarshalTransaction(bytes.NewReader(buf)) if err != nil { - logger := log.Node() - logger.Err(err). - Hex("tx_id", tx.ID[:]). - Msg("error unmarshaling downloaded tx") + fmt.Printf("error unmarshaling downloaded tx [%v]: %+v\n", err, tx) continue } if err := l.AddTransaction(tx); err != nil && errors.Cause(err) != ErrMissingParents { - logger := log.Node() - logger.Err(err). - Hex("tx_id", tx.ID[:]). - Msg("error adding downloaded tx to graph") + fmt.Printf("error adding downloaded tx to graph [%v]: %+v\n", err, tx) continue } diff --git a/snowball.go b/snowball.go index 829d365e..6f9e2eaa 100644 --- a/snowball.go +++ b/snowball.go @@ -20,6 +20,7 @@ package wavelet import ( + "fmt" "sync" ) @@ -111,7 +112,7 @@ func (s *Snowball) Tick(round *Round) { if s.lastID != round.ID { // Handle termination case. if s.lastID != ZeroRoundID { - //fmt.Printf("Snowball (%s) liveness fault: Last ID is %x with count %d, and new ID is %x.\n", s.name, s.lastID, s.count, round.ID) + fmt.Printf("Snowball (%s) liveness fault: Last ID is %x with count %d, and new ID is %x.\n", s.name, s.lastID, s.count, round.ID) } s.lastID = round.ID From 2a5a5a1cfb974fb4c53da0469e5bb26277912c95 Mon Sep 17 00:00:00 2001 From: Hasyimi Bahrudin Date: Tue, 6 Aug 2019 19:39:07 +0800 Subject: [PATCH 10/19] ledger: poll round to wait for consensus --- ledger.go | 32 -------------------------------- testutil.go | 18 +++++++++++++++++- 2 files changed, 17 insertions(+), 33 deletions(-) diff --git a/ledger.go b/ledger.go index 236b31db..d47ac75d 100644 --- a/ledger.go +++ b/ledger.go @@ -71,9 +71,6 @@ type Ledger struct { cacheChunks *LRU sendQuota chan struct{} - - finalizeCh chan struct{} - finalizeChLock sync.RWMutex } func NewLedger(kv store.KV, client *skademlia.Client, genesis *string) *Ledger { @@ -298,29 +295,6 @@ func (l *Ledger) BroadcastingNop() bool { return broadcastNops } -// WaitForConsensus blocks until the ledger reaches consensus. -// It returns false if it took longer than the timeout duration. -func (l *Ledger) WaitForConsensus(timeout time.Duration) bool { - l.finalizeChLock.Lock() - ch := make(chan struct{}) - l.finalizeCh = ch - l.finalizeChLock.Unlock() - - timer := time.NewTimer(timeout) - defer timer.Stop() - - select { - case <-ch: - return true - - case <-timer.C: - l.finalizeChLock.Lock() - l.finalizeCh = nil - l.finalizeChLock.Unlock() - return false - } -} - // BroadcastNop has the node send a nop transaction should they have sufficient // balance available. They are broadcasted if no other transaction that is not a nop transaction // is not broadcasted by the node after 500 milliseconds. These conditions only apply so long as @@ -742,12 +716,6 @@ FINALIZE_ROUNDS: Msg("Finalized consensus round, and initialized a new round.") //go ExportGraphDOT(finalized, l.graph) - - l.finalizeChLock.RLock() - if l.finalizeCh != nil { - l.finalizeCh <- struct{}{} - } - l.finalizeChLock.RUnlock() } } diff --git a/testutil.go b/testutil.go index 27f8e6ec..de2dcece 100644 --- a/testutil.go +++ b/testutil.go @@ -175,7 +175,23 @@ func (l *TestLedger) Reward() uint64 { func (l *TestLedger) WaitForConsensus() <-chan bool { ch := make(chan bool) go func() { - ch <- l.ledger.WaitForConsensus(time.Second * 10) + start := l.ledger.Rounds().Latest() + timeout := time.NewTimer(time.Second * 10) + ticker := time.NewTicker(time.Millisecond * 100) + for { + select { + case <-timeout.C: + ch <- false + return + + case <-ticker.C: + current := l.ledger.Rounds().Latest() + if current.Index > start.Index { + ch <- true + return + } + } + } }() return ch From cb5898bcd23be4e32da68f9b3bf42e8c278c8145 Mon Sep 17 00:00:00 2001 From: Hasyimi Bahrudin Date: Tue, 6 Aug 2019 20:35:09 +0800 Subject: [PATCH 11/19] ledger: don't print missing parent error --- ledger.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/ledger.go b/ledger.go index d47ac75d..149c3dd6 100644 --- a/ledger.go +++ b/ledger.go @@ -165,7 +165,9 @@ func (l *Ledger) AddTransaction(tx Transaction) error { err := l.graph.AddTransaction(tx) if err != nil && errors.Cause(err) != ErrAlreadyExists { - if !strings.Contains(errors.Cause(err).Error(), "transaction has no parents") { + if !strings.Contains(errors.Cause(err).Error(), "transaction has no parents") && + !strings.Contains(errors.Cause(err).Error(), "parents for transaction are not in graph") { + fmt.Println(err) } return err @@ -287,6 +289,8 @@ func (l *Ledger) Snapshot() *avl.Tree { return l.accounts.Snapshot() } +// BroadcastingNop returns true if the node is +// supposed to broadcast nop transaction. func (l *Ledger) BroadcastingNop() bool { l.broadcastNopsLock.Lock() broadcastNops := l.broadcastNops From 039c3c64d20b7a3a834f3b687b42fc1da983f6cf Mon Sep 17 00:00:00 2001 From: Hasyimi Bahrudin Date: Tue, 6 Aug 2019 20:36:21 +0800 Subject: [PATCH 12/19] ledger: allow test to run up to 5 minutes --- ledger_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ledger_test.go b/ledger_test.go index f25b8ee6..b86c54a3 100644 --- a/ledger_test.go +++ b/ledger_test.go @@ -43,7 +43,7 @@ func TestLedger_BroadcastNop(t *testing.T) { assert.NoError(t, err) } - timeout := time.NewTimer(time.Second * 60) + timeout := time.NewTimer(time.Minute * 5) for { select { case <-timeout.C: From dd3f24d2dcc8c3267e45685f4d09b2b4b4c8f68d Mon Sep 17 00:00:00 2001 From: Hasyimi Bahrudin Date: Tue, 6 Aug 2019 22:48:58 +0800 Subject: [PATCH 13/19] api, cmd/graph, cmd/wavelet, ledger: use options --- api/mod_test.go | 21 +++++++++++---------- api/mod_ws_test.go | 2 +- cmd/graph/main.go | 2 +- cmd/wavelet/main.go | 17 +++++++++-------- ledger.go | 33 ++++++++++++++++++++++++--------- testutil.go | 2 +- utils_test.go | 2 +- 7 files changed, 48 insertions(+), 31 deletions(-) diff --git a/api/mod_test.go b/api/mod_test.go index 14730aca..a8a5ab07 100644 --- a/api/mod_test.go +++ b/api/mod_test.go @@ -26,15 +26,6 @@ import ( "encoding/hex" "encoding/json" "fmt" - "github.com/buaazp/fasthttprouter" - "github.com/perlin-network/noise/skademlia" - "github.com/perlin-network/wavelet" - "github.com/perlin-network/wavelet/store" - "github.com/perlin-network/wavelet/sys" - "github.com/pkg/errors" - "github.com/stretchr/testify/assert" - "github.com/valyala/fasthttp" - "github.com/valyala/fastjson" "io/ioutil" "net" "net/http" @@ -44,6 +35,16 @@ import ( "testing" "testing/quick" "time" + + "github.com/buaazp/fasthttprouter" + "github.com/perlin-network/noise/skademlia" + "github.com/perlin-network/wavelet" + "github.com/perlin-network/wavelet/store" + "github.com/perlin-network/wavelet/sys" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/valyala/fasthttp" + "github.com/valyala/fastjson" ) func TestListTransaction(t *testing.T) { @@ -744,7 +745,7 @@ func createLedger(t *testing.T) *wavelet.Ledger { keys, err := skademlia.NewKeys(1, 1) assert.NoError(t, err) - ledger := wavelet.NewLedger(store.NewInmem(), skademlia.NewClient(":0", keys), nil) + ledger := wavelet.NewLedger(store.NewInmem(), skademlia.NewClient(":0", keys)) return ledger } diff --git a/api/mod_ws_test.go b/api/mod_ws_test.go index 4c84a9e4..347e96fb 100644 --- a/api/mod_ws_test.go +++ b/api/mod_ws_test.go @@ -44,7 +44,7 @@ func TestPollLog(t *testing.T) { keys, err := skademlia.NewKeys(1, 1) assert.NoError(t, err) - ledger := wavelet.NewLedger(store.NewInmem(), skademlia.NewClient(":0", keys), nil) + ledger := wavelet.NewLedger(store.NewInmem(), skademlia.NewClient(":0", keys)) go gateway.StartHTTP(8080, nil, ledger, keys) defer gateway.Shutdown() diff --git a/cmd/graph/main.go b/cmd/graph/main.go index 087a4ac1..2e2e1e66 100644 --- a/cmd/graph/main.go +++ b/cmd/graph/main.go @@ -113,7 +113,7 @@ func main() { client.SetCredentials(noise.NewCredentials(addr, handshake.NewECDH(), cipher.NewAEAD(), client.Protocol())) - ledger := wavelet.NewLedger(store.NewInmem(), client, nil) + ledger := wavelet.NewLedger(store.NewInmem(), client) go func() { server := client.Listen() diff --git a/cmd/wavelet/main.go b/cmd/wavelet/main.go index c53e48b8..454231c2 100644 --- a/cmd/wavelet/main.go +++ b/cmd/wavelet/main.go @@ -23,6 +23,14 @@ import ( "encoding/hex" "errors" "fmt" + "io/ioutil" + "net" + "net/http" + "os" + "sort" + "strconv" + "time" + "github.com/perlin-network/noise" "github.com/perlin-network/noise/cipher" "github.com/perlin-network/noise/edwards25519" @@ -38,13 +46,6 @@ import ( "google.golang.org/grpc" "gopkg.in/urfave/cli.v1" "gopkg.in/urfave/cli.v1/altsrc" - "io/ioutil" - "net" - "net/http" - "os" - "sort" - "strconv" - "time" ) import _ "net/http/pprof" @@ -316,7 +317,7 @@ func start(cfg *Config) { logger.Fatal().Err(err).Msgf("Failed to create/open database located at %q.", cfg.Database) } - ledger := wavelet.NewLedger(kv, client, cfg.Genesis) + ledger := wavelet.NewLedger(kv, client, wavelet.WithGenesis(cfg.Genesis)) go func() { server := client.Listen() diff --git a/ledger.go b/ledger.go index 149c3dd6..f51c7838 100644 --- a/ledger.go +++ b/ledger.go @@ -28,7 +28,6 @@ import ( "math/rand" "strings" "sync" - "testing" "time" "github.com/perlin-network/noise" @@ -73,16 +72,32 @@ type Ledger struct { sendQuota chan struct{} } -func NewLedger(kv store.KV, client *skademlia.Client, genesis *string) *Ledger { - return newLedger(kv, client, genesis, true) +type config struct { + GCDisabled bool + Genesis *string } -// NewLedgerWithoutGC should only be used for testing, as it disables GC. -func NewLedgerWithoutGC(t testing.TB, kv store.KV, client *skademlia.Client) *Ledger { - return newLedger(kv, client, nil, false) +type Option func(cfg *config) + +// WithoutGC disables GC. Used for testing purposes. +func WithoutGC() Option { + return func(cfg *config) { + cfg.GCDisabled = true + } } -func newLedger(kv store.KV, client *skademlia.Client, genesis *string, gc bool) *Ledger { +func WithGenesis(genesis *string) Option { + return func(cfg *config) { + cfg.Genesis = genesis + } +} + +func NewLedger(kv store.KV, client *skademlia.Client, opts ...Option) *Ledger { + var cfg config + for _, opt := range opts { + opt(&cfg) + } + logger := log.Node() metrics := NewMetrics(context.TODO()) @@ -90,7 +105,7 @@ func newLedger(kv store.KV, client *skademlia.Client, genesis *string, gc bool) accounts := NewAccounts(kv) - if gc { + if !cfg.GCDisabled { go accounts.GC(context.Background()) } @@ -99,7 +114,7 @@ func newLedger(kv store.KV, client *skademlia.Client, genesis *string, gc bool) var round *Round if rounds != nil && err != nil { - genesis := performInception(accounts.tree, genesis) + genesis := performInception(accounts.tree, cfg.Genesis) if err := accounts.Commit(nil); err != nil { logger.Fatal().Err(err).Msg("BUG: accounts.Commit") } diff --git a/testutil.go b/testutil.go index de2dcece..06fc5f3a 100644 --- a/testutil.go +++ b/testutil.go @@ -96,7 +96,7 @@ func NewTestLedger(t testing.TB, cfg TestLedgerConfig) *TestLedger { client.SetCredentials(noise.NewCredentials(addr, handshake.NewECDH(), cipher.NewAEAD(), client.Protocol())) kv, cleanup := store.NewTestKV(t, "inmem", "db") - ledger := NewLedgerWithoutGC(t, kv, client) + ledger := NewLedger(kv, client, WithoutGC()) server := client.Listen() RegisterWaveletServer(server, ledger.Protocol()) diff --git a/utils_test.go b/utils_test.go index e789d0be..c1f6cb15 100644 --- a/utils_test.go +++ b/utils_test.go @@ -91,7 +91,7 @@ func newNode(t *testing.T) (*skademlia.Client, string, func()) { client.SetCredentials(noise.NewCredentials(addr, handshake.NewECDH(), cipher.NewAEAD(), client.Protocol())) kv, cleanup := store.NewTestKV(t, "inmem", "db") - ledger := NewLedgerWithoutGC(t, kv, client) + ledger := NewLedger(kv, client, WithoutGC()) server := client.Listen() RegisterWaveletServer(server, ledger.Protocol()) From d34810646284185717ff20ac2cb88303faef1144 Mon Sep 17 00:00:00 2001 From: Hasyimi Bahrudin Date: Tue, 6 Aug 2019 22:50:56 +0800 Subject: [PATCH 14/19] ledger: remove stray defer statement --- ledger.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ledger.go b/ledger.go index f51c7838..fc1d2162 100644 --- a/ledger.go +++ b/ledger.go @@ -309,7 +309,7 @@ func (l *Ledger) Snapshot() *avl.Tree { func (l *Ledger) BroadcastingNop() bool { l.broadcastNopsLock.Lock() broadcastNops := l.broadcastNops - defer l.broadcastNopsLock.Unlock() + l.broadcastNopsLock.Unlock() return broadcastNops } From b0202d30bf9ca2949b7a11753eb500f0e50eabab Mon Sep 17 00:00:00 2001 From: Hasyimi Bahrudin Date: Tue, 6 Aug 2019 23:11:42 +0800 Subject: [PATCH 15/19] ledger: reduce number of nodes and add sleep --- ledger_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/ledger_test.go b/ledger_test.go index b86c54a3..ea866783 100644 --- a/ledger_test.go +++ b/ledger_test.go @@ -19,7 +19,7 @@ func TestLedger_BroadcastNop(t *testing.T) { testnet := NewTestNetwork(t) defer testnet.Cleanup() - for i := 0; i < 20; i++ { + for i := 0; i < 3; i++ { testnet.AddNode(t, 0) } @@ -33,6 +33,10 @@ func TestLedger_BroadcastNop(t *testing.T) { } } + // Sleep for some time to give room for the nodes to + // bootstrap the overlay S/Kademlia network + time.Sleep(time.Second * 3) + // Add lots of transactions txs := make([]Transaction, 1000) var err error From ff4cf30bee4eb447f06e314ebc9f58b57fc38875 Mon Sep 17 00:00:00 2001 From: Hasyimi Bahrudin Date: Wed, 7 Aug 2019 14:01:45 +0800 Subject: [PATCH 16/19] ledger: add failing test to check excessive consensus round --- ledger_test.go | 32 ++++++++++++++++++++++++++++++++ testutil.go | 2 +- 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/ledger_test.go b/ledger_test.go index ea866783..7fd6a1c8 100644 --- a/ledger_test.go +++ b/ledger_test.go @@ -76,3 +76,35 @@ func TestLedger_BroadcastNop(t *testing.T) { } } } + +func TestLedger_AddTransaction(t *testing.T) { + testnet := NewTestNetwork(t) + defer testnet.Cleanup() + + alice := testnet.AddNode(t, 1000000) + testnet.AddNode(t, 0) // bob + + // Wait for alice to receive her PERL from the faucet + for <-alice.WaitForConsensus() { + if alice.Balance() > 0 { + break + } + } + + start := alice.ledger.Rounds().Latest().Index + + // Add just 1 transaction + _, err := alice.PlaceStake(100) + assert.NoError(t, err) + + // Try to wait for 2 rounds of consensus. + // The second call should result in timeout, because + // only 1 round should be finalized. + <-alice.WaitForConsensus() + <-alice.WaitForConsensus() + + current := alice.ledger.Rounds().Latest().Index + if current-start > 1 { + t.Fatal("more than 1 round finalized") + } +} diff --git a/testutil.go b/testutil.go index 06fc5f3a..a0aef257 100644 --- a/testutil.go +++ b/testutil.go @@ -176,7 +176,7 @@ func (l *TestLedger) WaitForConsensus() <-chan bool { ch := make(chan bool) go func() { start := l.ledger.Rounds().Latest() - timeout := time.NewTimer(time.Second * 10) + timeout := time.NewTimer(time.Second * 3) ticker := time.NewTicker(time.Millisecond * 100) for { select { From 0accc44e5d6170380bed0abe02fc3df843758949 Mon Sep 17 00:00:00 2001 From: Hasyimi Bahrudin Date: Wed, 7 Aug 2019 14:31:23 +0800 Subject: [PATCH 17/19] ledger: fix excessive nop --- ledger.go | 2 +- ledger_test.go | 13 +++---------- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/ledger.go b/ledger.go index fc1d2162..81fd83b3 100644 --- a/ledger.go +++ b/ledger.go @@ -520,7 +520,7 @@ FINALIZE_ROUNDS: // Only stop broadcasting nops if the most recently added transaction // has been applied l.broadcastNopsLock.Lock() - if l.broadcastNops && l.broadcastNopsMaxDepth <= l.graph.RootDepth() { + if l.broadcastNops && l.broadcastNopsMaxDepth <= l.finalizer.Preferred().End.Depth { l.broadcastNops = false } l.broadcastNopsLock.Unlock() diff --git a/ledger_test.go b/ledger_test.go index 7fd6a1c8..af604dcb 100644 --- a/ledger_test.go +++ b/ledger_test.go @@ -81,20 +81,13 @@ func TestLedger_AddTransaction(t *testing.T) { testnet := NewTestNetwork(t) defer testnet.Cleanup() - alice := testnet.AddNode(t, 1000000) - testnet.AddNode(t, 0) // bob - - // Wait for alice to receive her PERL from the faucet - for <-alice.WaitForConsensus() { - if alice.Balance() > 0 { - break - } - } + alice := testnet.AddNode(t, 0) // alice + testnet.AddNode(t, 0) // bob start := alice.ledger.Rounds().Latest().Index // Add just 1 transaction - _, err := alice.PlaceStake(100) + _, err := testnet.faucet.PlaceStake(100) assert.NoError(t, err) // Try to wait for 2 rounds of consensus. From b86ced48ce94df831e4dd292b56b304b636572c0 Mon Sep 17 00:00:00 2001 From: Hasyimi Bahrudin Date: Wed, 7 Aug 2019 14:49:42 +0800 Subject: [PATCH 18/19] ledger: revert error silencing --- ledger.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/ledger.go b/ledger.go index 81fd83b3..d646a70e 100644 --- a/ledger.go +++ b/ledger.go @@ -180,9 +180,7 @@ func (l *Ledger) AddTransaction(tx Transaction) error { err := l.graph.AddTransaction(tx) if err != nil && errors.Cause(err) != ErrAlreadyExists { - if !strings.Contains(errors.Cause(err).Error(), "transaction has no parents") && - !strings.Contains(errors.Cause(err).Error(), "parents for transaction are not in graph") { - + if !strings.Contains(errors.Cause(err).Error(), "transaction has no parents") { fmt.Println(err) } return err From a5a36adb18e4333a42c0fae38438a30321084c0b Mon Sep 17 00:00:00 2001 From: Hasyimi Bahrudin Date: Wed, 7 Aug 2019 16:01:07 +0800 Subject: [PATCH 19/19] ledger: improve test --- ledger_test.go | 51 ++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 37 insertions(+), 14 deletions(-) diff --git a/ledger_test.go b/ledger_test.go index af604dcb..ad2081b6 100644 --- a/ledger_test.go +++ b/ledger_test.go @@ -2,6 +2,7 @@ package wavelet import ( "fmt" + "sync" "testing" "time" @@ -33,20 +34,26 @@ func TestLedger_BroadcastNop(t *testing.T) { } } - // Sleep for some time to give room for the nodes to - // bootstrap the overlay S/Kademlia network - time.Sleep(time.Second * 3) - // Add lots of transactions - txs := make([]Transaction, 1000) - var err error + var txsLock sync.Mutex + txs := make([]Transaction, 0, 10000) - fmt.Println("adding transactions...") - for i := 0; i < len(txs); i++ { - txs[i], err = alice.Pay(bob, 1) - assert.NoError(t, err) - } + go func() { + for i := 0; i < cap(txs); i++ { + tx, err := alice.Pay(bob, 1) + assert.NoError(t, err) + + txsLock.Lock() + txs = append(txs, tx) + txsLock.Unlock() + + // Somehow this prevents AddTransaction from + // returning ErrMissingParents + time.Sleep(time.Nanosecond * 1) + } + }() + prevRound := alice.ledger.Rounds().Latest().Index timeout := time.NewTimer(time.Minute * 5) for { select { @@ -55,22 +62,38 @@ func TestLedger_BroadcastNop(t *testing.T) { case <-alice.WaitForConsensus(): var appliedCount int + var txsCount int + + txsLock.Lock() for _, tx := range txs { if alice.Applied(tx) { appliedCount++ } + txsCount++ + } + txsLock.Unlock() + + currRound := alice.ledger.Rounds().Latest().Index + + fmt.Printf("%d/%d tx applied, round=%d, root depth=%d\n", + appliedCount, txsCount, + currRound, + alice.ledger.Graph().RootDepth()) + + if currRound-prevRound > 1 { + t.Fatal("more than 1 round finalized") } - fmt.Printf("%d/%d tx applied\n", appliedCount, len(txs)) + prevRound = currRound - if appliedCount < len(txs) { + if appliedCount < cap(txs) { assert.True(t, alice.ledger.BroadcastingNop(), "node should not stop broadcasting nop while there are unapplied tx") } // The test is successful if all tx are applied, // and nop broadcasting is stopped once all tx are applied - if appliedCount == len(txs) && !alice.ledger.BroadcastingNop() { + if appliedCount == cap(txs) && !alice.ledger.BroadcastingNop() { return } }