diff --git a/accounts.go b/accounts.go index 7b65af0a..6e656c2b 100644 --- a/accounts.go +++ b/accounts.go @@ -21,13 +21,14 @@ package wavelet import ( "context" - "github.com/perlin-network/wavelet/avl" - "github.com/perlin-network/wavelet/store" - "github.com/pkg/errors" "sync" "sync/atomic" "time" "unsafe" + + "github.com/perlin-network/wavelet/avl" + "github.com/perlin-network/wavelet/store" + "github.com/pkg/errors" ) type Accounts struct { diff --git a/api/mod_test.go b/api/mod_test.go index 14730aca..a8a5ab07 100644 --- a/api/mod_test.go +++ b/api/mod_test.go @@ -26,15 +26,6 @@ import ( "encoding/hex" "encoding/json" "fmt" - "github.com/buaazp/fasthttprouter" - "github.com/perlin-network/noise/skademlia" - "github.com/perlin-network/wavelet" - "github.com/perlin-network/wavelet/store" - "github.com/perlin-network/wavelet/sys" - "github.com/pkg/errors" - "github.com/stretchr/testify/assert" - "github.com/valyala/fasthttp" - "github.com/valyala/fastjson" "io/ioutil" "net" "net/http" @@ -44,6 +35,16 @@ import ( "testing" "testing/quick" "time" + + "github.com/buaazp/fasthttprouter" + "github.com/perlin-network/noise/skademlia" + "github.com/perlin-network/wavelet" + "github.com/perlin-network/wavelet/store" + "github.com/perlin-network/wavelet/sys" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/valyala/fasthttp" + "github.com/valyala/fastjson" ) func TestListTransaction(t *testing.T) { @@ -744,7 +745,7 @@ func createLedger(t *testing.T) *wavelet.Ledger { keys, err := skademlia.NewKeys(1, 1) assert.NoError(t, err) - ledger := wavelet.NewLedger(store.NewInmem(), skademlia.NewClient(":0", keys), nil) + ledger := wavelet.NewLedger(store.NewInmem(), skademlia.NewClient(":0", keys)) return ledger } diff --git a/api/mod_ws_test.go b/api/mod_ws_test.go index 4c84a9e4..347e96fb 100644 --- a/api/mod_ws_test.go +++ b/api/mod_ws_test.go @@ -44,7 +44,7 @@ func TestPollLog(t *testing.T) { keys, err := skademlia.NewKeys(1, 1) assert.NoError(t, err) - ledger := wavelet.NewLedger(store.NewInmem(), skademlia.NewClient(":0", keys), nil) + ledger := wavelet.NewLedger(store.NewInmem(), skademlia.NewClient(":0", keys)) go gateway.StartHTTP(8080, nil, ledger, keys) defer gateway.Shutdown() diff --git a/cmd/graph/main.go b/cmd/graph/main.go index 087a4ac1..2e2e1e66 100644 --- a/cmd/graph/main.go +++ b/cmd/graph/main.go @@ -113,7 +113,7 @@ func main() { client.SetCredentials(noise.NewCredentials(addr, handshake.NewECDH(), cipher.NewAEAD(), client.Protocol())) - ledger := wavelet.NewLedger(store.NewInmem(), client, nil) + ledger := wavelet.NewLedger(store.NewInmem(), client) go func() { server := client.Listen() diff --git a/cmd/wavelet/main.go b/cmd/wavelet/main.go index c53e48b8..454231c2 100644 --- a/cmd/wavelet/main.go +++ b/cmd/wavelet/main.go @@ -23,6 +23,14 @@ import ( "encoding/hex" "errors" "fmt" + "io/ioutil" + "net" + "net/http" + "os" + "sort" + "strconv" + "time" + "github.com/perlin-network/noise" "github.com/perlin-network/noise/cipher" "github.com/perlin-network/noise/edwards25519" @@ -38,13 +46,6 @@ import ( "google.golang.org/grpc" "gopkg.in/urfave/cli.v1" "gopkg.in/urfave/cli.v1/altsrc" - "io/ioutil" - "net" - "net/http" - "os" - "sort" - "strconv" - "time" ) import _ "net/http/pprof" @@ -316,7 +317,7 @@ func start(cfg *Config) { logger.Fatal().Err(err).Msgf("Failed to create/open database located at %q.", cfg.Database) } - ledger := wavelet.NewLedger(kv, client, cfg.Genesis) + ledger := wavelet.NewLedger(kv, client, wavelet.WithGenesis(cfg.Genesis)) go func() { server := client.Listen() diff --git a/ledger.go b/ledger.go index 9f56ec31..d646a70e 100644 --- a/ledger.go +++ b/ledger.go @@ -25,6 +25,11 @@ import ( "encoding/binary" "encoding/hex" "fmt" + "math/rand" + "strings" + "sync" + "time" + "github.com/perlin-network/noise" "github.com/perlin-network/noise/skademlia" "github.com/perlin-network/wavelet/avl" @@ -36,10 +41,6 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/peer" - "math/rand" - "strings" - "sync" - "time" ) type Ledger struct { @@ -57,9 +58,10 @@ type Ledger struct { consensus sync.WaitGroup - broadcastNops bool - broadcastNopsDelay time.Time - broadcastNopsLock sync.Mutex + broadcastNops bool + broadcastNopsMaxDepth uint64 + broadcastNopsDelay time.Time + broadcastNopsLock sync.Mutex sync chan struct{} syncVotes chan vote @@ -70,21 +72,49 @@ type Ledger struct { sendQuota chan struct{} } -func NewLedger(kv store.KV, client *skademlia.Client, genesis *string) *Ledger { +type config struct { + GCDisabled bool + Genesis *string +} + +type Option func(cfg *config) + +// WithoutGC disables GC. Used for testing purposes. +func WithoutGC() Option { + return func(cfg *config) { + cfg.GCDisabled = true + } +} + +func WithGenesis(genesis *string) Option { + return func(cfg *config) { + cfg.Genesis = genesis + } +} + +func NewLedger(kv store.KV, client *skademlia.Client, opts ...Option) *Ledger { + var cfg config + for _, opt := range opts { + opt(&cfg) + } + logger := log.Node() metrics := NewMetrics(context.TODO()) indexer := NewIndexer() accounts := NewAccounts(kv) - go accounts.GC(context.Background()) + + if !cfg.GCDisabled { + go accounts.GC(context.Background()) + } rounds, err := NewRounds(kv, sys.PruningLimit) var round *Round if rounds != nil && err != nil { - genesis := performInception(accounts.tree, genesis) + genesis := performInception(accounts.tree, cfg.Genesis) if err := accounts.Commit(nil); err != nil { logger.Fatal().Err(err).Msg("BUG: accounts.Commit") } @@ -162,12 +192,13 @@ func (l *Ledger) AddTransaction(tx Transaction) error { l.gossiper.Push(tx) l.broadcastNopsLock.Lock() - if tx.Tag != sys.TagNop { + if tx.Tag != sys.TagNop && tx.Sender == l.client.Keys().PublicKey() { + l.broadcastNops = true l.broadcastNopsDelay = time.Now() - } - if tx.Sender == l.client.Keys().PublicKey() && l.finalizer.Preferred() == nil { - l.broadcastNops = true + if tx.Depth > l.broadcastNopsMaxDepth { + l.broadcastNopsMaxDepth = tx.Depth + } } l.broadcastNopsLock.Unlock() } @@ -271,6 +302,16 @@ func (l *Ledger) Snapshot() *avl.Tree { return l.accounts.Snapshot() } +// BroadcastingNop returns true if the node is +// supposed to broadcast nop transaction. +func (l *Ledger) BroadcastingNop() bool { + l.broadcastNopsLock.Lock() + broadcastNops := l.broadcastNops + l.broadcastNopsLock.Unlock() + + return broadcastNops +} + // BroadcastNop has the node send a nop transaction should they have sufficient // balance available. They are broadcasted if no other transaction that is not a nop transaction // is not broadcasted by the node after 500 milliseconds. These conditions only apply so long as @@ -387,7 +428,7 @@ func (l *Ledger) PullMissingTransactions() { for _, buf := range batch.Transactions { tx, err := UnmarshalTransaction(bytes.NewReader(buf)) if err != nil { - fmt.Printf("error unmarshaling downloaded tx [%v]: %+v", err, tx) + fmt.Printf("error unmarshaling downloaded tx [%v]: %+v\n", err, tx) continue } @@ -474,8 +515,12 @@ FINALIZE_ROUNDS: continue FINALIZE_ROUNDS } + // Only stop broadcasting nops if the most recently added transaction + // has been applied l.broadcastNopsLock.Lock() - l.broadcastNops = false + if l.broadcastNops && l.broadcastNopsMaxDepth <= l.finalizer.Preferred().End.Depth { + l.broadcastNops = false + } l.broadcastNopsLock.Unlock() workerChan := make(chan *grpc.ClientConn, 16) diff --git a/ledger_test.go b/ledger_test.go new file mode 100644 index 00000000..ad2081b6 --- /dev/null +++ b/ledger_test.go @@ -0,0 +1,126 @@ +package wavelet + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +// TestLedger_BroadcastNop checks that: +// +// * The ledger will keep broadcasting nop tx as long +// as there are unapplied tx (latestTxDepth <= rootDepth). +// +// * The ledger will stop broadcasting nop once there +// are no more unapplied tx. +func TestLedger_BroadcastNop(t *testing.T) { + testnet := NewTestNetwork(t) + defer testnet.Cleanup() + + for i := 0; i < 3; i++ { + testnet.AddNode(t, 0) + } + + alice := testnet.AddNode(t, 1000000) + bob := testnet.AddNode(t, 0) + + // Wait for alice to receive her PERL from the faucet + for <-alice.WaitForConsensus() { + if alice.Balance() > 0 { + break + } + } + + // Add lots of transactions + var txsLock sync.Mutex + txs := make([]Transaction, 0, 10000) + + go func() { + for i := 0; i < cap(txs); i++ { + tx, err := alice.Pay(bob, 1) + assert.NoError(t, err) + + txsLock.Lock() + txs = append(txs, tx) + txsLock.Unlock() + + // Somehow this prevents AddTransaction from + // returning ErrMissingParents + time.Sleep(time.Nanosecond * 1) + } + }() + + prevRound := alice.ledger.Rounds().Latest().Index + timeout := time.NewTimer(time.Minute * 5) + for { + select { + case <-timeout.C: + t.Fatal("timed out before all transactions are applied") + + case <-alice.WaitForConsensus(): + var appliedCount int + var txsCount int + + txsLock.Lock() + for _, tx := range txs { + if alice.Applied(tx) { + appliedCount++ + } + txsCount++ + } + txsLock.Unlock() + + currRound := alice.ledger.Rounds().Latest().Index + + fmt.Printf("%d/%d tx applied, round=%d, root depth=%d\n", + appliedCount, txsCount, + currRound, + alice.ledger.Graph().RootDepth()) + + if currRound-prevRound > 1 { + t.Fatal("more than 1 round finalized") + } + + prevRound = currRound + + if appliedCount < cap(txs) { + assert.True(t, alice.ledger.BroadcastingNop(), + "node should not stop broadcasting nop while there are unapplied tx") + } + + // The test is successful if all tx are applied, + // and nop broadcasting is stopped once all tx are applied + if appliedCount == cap(txs) && !alice.ledger.BroadcastingNop() { + return + } + } + } +} + +func TestLedger_AddTransaction(t *testing.T) { + testnet := NewTestNetwork(t) + defer testnet.Cleanup() + + alice := testnet.AddNode(t, 0) // alice + testnet.AddNode(t, 0) // bob + + start := alice.ledger.Rounds().Latest().Index + + // Add just 1 transaction + _, err := testnet.faucet.PlaceStake(100) + assert.NoError(t, err) + + // Try to wait for 2 rounds of consensus. + // The second call should result in timeout, because + // only 1 round should be finalized. + <-alice.WaitForConsensus() + <-alice.WaitForConsensus() + + current := alice.ledger.Rounds().Latest().Index + if current-start > 1 { + t.Fatal("more than 1 round finalized") + } +} diff --git a/testutil.go b/testutil.go new file mode 100644 index 00000000..a0aef257 --- /dev/null +++ b/testutil.go @@ -0,0 +1,339 @@ +package wavelet + +import ( + "encoding/hex" + "fmt" + "net" + "strconv" + "sync" + "testing" + "time" + + "github.com/perlin-network/noise" + "github.com/perlin-network/noise/cipher" + "github.com/perlin-network/noise/edwards25519" + "github.com/perlin-network/noise/handshake" + "github.com/perlin-network/noise/skademlia" + "github.com/perlin-network/wavelet/store" + "github.com/perlin-network/wavelet/sys" + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" +) + +type TestNetwork struct { + faucet *TestLedger + nodes []*TestLedger +} + +func NewTestNetwork(t testing.TB) *TestNetwork { + return &TestNetwork{ + faucet: NewTestFaucet(t), + nodes: []*TestLedger{}, + } +} + +func (n *TestNetwork) Cleanup() { + for _, node := range n.nodes { + node.Cleanup() + } + n.faucet.Cleanup() +} + +func (n *TestNetwork) AddNode(t testing.TB, startingBalance uint64) *TestLedger { + node := NewTestLedger(t, TestLedgerConfig{ + Peers: []string{n.faucet.Addr()}, + }) + n.nodes = append(n.nodes, node) + + if startingBalance > 0 { + _, err := n.faucet.Pay(node, startingBalance) + assert.NoError(t, err) + } + + return node +} + +func (n *TestNetwork) WaitForConsensus(t testing.TB) { + all := append(n.nodes, n.faucet) + TestWaitForConsensus(t, time.Second*30, all) +} + +type TestLedger struct { + ledger *Ledger + server *grpc.Server + addr string + kv store.KV + kvCleanup func() + finalized chan struct{} + stopped chan struct{} +} + +type TestLedgerConfig struct { + Wallet string + Peers []string +} + +func defaultConfig(t testing.TB) *TestLedgerConfig { + return &TestLedgerConfig{} +} + +// NewTestFaucet returns an account with a lot of PERLs. +func NewTestFaucet(t testing.TB) *TestLedger { + return NewTestLedger(t, TestLedgerConfig{ + Wallet: "87a6813c3b4cf534b6ae82db9b1409fa7dbd5c13dba5858970b56084c4a930eb400056ee68a7cc2695222df05ea76875bc27ec6e61e8e62317c336157019c405", + }) +} + +func NewTestLedger(t testing.TB, cfg TestLedgerConfig) *TestLedger { + keys := loadKeys(t, cfg.Wallet) + + ln, err := net.Listen("tcp", ":0") + assert.NoError(t, err) + + addr := net.JoinHostPort("127.0.0.1", strconv.Itoa(ln.Addr().(*net.TCPAddr).Port)) + + client := skademlia.NewClient(addr, keys, skademlia.WithC1(sys.SKademliaC1), skademlia.WithC2(sys.SKademliaC2)) + client.SetCredentials(noise.NewCredentials(addr, handshake.NewECDH(), cipher.NewAEAD(), client.Protocol())) + + kv, cleanup := store.NewTestKV(t, "inmem", "db") + ledger := NewLedger(kv, client, WithoutGC()) + server := client.Listen() + RegisterWaveletServer(server, ledger.Protocol()) + + stopped := make(chan struct{}) + go func() { + defer close(stopped) + if err := server.Serve(ln); err != nil && err != grpc.ErrServerStopped { + t.Fatal(err) + } + }() + + for _, addr := range cfg.Peers { + if _, err := client.Dial(addr); err != nil { + t.Fatal(err) + } + } + + client.Bootstrap() + + return &TestLedger{ + ledger: ledger, + server: server, + addr: addr, + kv: kv, + kvCleanup: cleanup, + stopped: stopped, + } +} + +func (l *TestLedger) Cleanup() { + l.server.GracefulStop() + <-l.stopped + + //l.kvCleanup() +} + +func (l *TestLedger) Addr() string { + return l.addr +} + +func (l *TestLedger) PublicKey() AccountID { + keys := l.ledger.client.Keys() + return keys.PublicKey() +} + +func (l *TestLedger) Balance() uint64 { + snapshot := l.ledger.Snapshot() + balance, _ := ReadAccountBalance(snapshot, l.PublicKey()) + return balance +} + +func (l *TestLedger) BalanceOfAccount(node *TestLedger) uint64 { + snapshot := l.ledger.Snapshot() + balance, _ := ReadAccountBalance(snapshot, node.PublicKey()) + return balance +} + +func (l *TestLedger) Stake() uint64 { + snapshot := l.ledger.Snapshot() + stake, _ := ReadAccountStake(snapshot, l.PublicKey()) + return stake +} + +func (l *TestLedger) StakeOfAccount(node *TestLedger) uint64 { + snapshot := l.ledger.Snapshot() + balance, _ := ReadAccountStake(snapshot, node.PublicKey()) + return balance +} + +func (l *TestLedger) Reward() uint64 { + snapshot := l.ledger.Snapshot() + reward, _ := ReadAccountReward(snapshot, l.PublicKey()) + return reward +} + +func (l *TestLedger) WaitForConsensus() <-chan bool { + ch := make(chan bool) + go func() { + start := l.ledger.Rounds().Latest() + timeout := time.NewTimer(time.Second * 3) + ticker := time.NewTicker(time.Millisecond * 100) + for { + select { + case <-timeout.C: + ch <- false + return + + case <-ticker.C: + current := l.ledger.Rounds().Latest() + if current.Index > start.Index { + ch <- true + return + } + } + } + }() + + return ch +} + +func (l *TestLedger) Nop() (Transaction, error) { + keys := l.ledger.client.Keys() + tx := AttachSenderToTransaction( + keys, + NewTransaction(keys, sys.TagNop, nil), + l.ledger.Graph().FindEligibleParents()...) + + err := l.ledger.AddTransaction(tx) + return tx, err +} + +func (l *TestLedger) Pay(to *TestLedger, amount uint64) (Transaction, error) { + payload := Transfer{ + Recipient: to.PublicKey(), + Amount: amount, + } + + keys := l.ledger.client.Keys() + tx := AttachSenderToTransaction( + keys, + NewTransaction(keys, sys.TagTransfer, payload.Marshal()), + l.ledger.Graph().FindEligibleParents()...) + + err := l.ledger.AddTransaction(tx) + return tx, err +} + +func (l *TestLedger) PlaceStake(amount uint64) (Transaction, error) { + payload := Stake{ + Opcode: sys.PlaceStake, + Amount: amount, + } + + keys := l.ledger.client.Keys() + tx := AttachSenderToTransaction( + keys, + NewTransaction(keys, sys.TagStake, payload.Marshal()), + l.ledger.Graph().FindEligibleParents()...) + + err := l.ledger.AddTransaction(tx) + return tx, err +} + +func (l *TestLedger) WithdrawStake(amount uint64) (Transaction, error) { + payload := Stake{ + Opcode: sys.WithdrawStake, + Amount: amount, + } + + keys := l.ledger.client.Keys() + tx := AttachSenderToTransaction( + keys, + NewTransaction(keys, sys.TagStake, payload.Marshal()), + l.ledger.Graph().FindEligibleParents()...) + + err := l.ledger.AddTransaction(tx) + return tx, err +} + +func (l *TestLedger) WithdrawReward(amount uint64) (Transaction, error) { + payload := Stake{ + Opcode: sys.WithdrawReward, + Amount: amount, + } + + keys := l.ledger.client.Keys() + tx := AttachSenderToTransaction( + keys, + NewTransaction(keys, sys.TagStake, payload.Marshal()), + l.ledger.Graph().FindEligibleParents()...) + + err := l.ledger.AddTransaction(tx) + return tx, err +} + +func (l *TestLedger) FindTransaction(t testing.TB, id TransactionID) *Transaction { + return l.ledger.Graph().FindTransaction(id) +} + +func (l *TestLedger) Applied(tx Transaction) bool { + return tx.Depth <= l.ledger.Graph().RootDepth() +} + +func TestWaitForConsensus(t testing.TB, timeout time.Duration, ledgers []*TestLedger) { + var wg sync.WaitGroup + for _, l := range ledgers { + wg.Add(1) + go func(ledger *TestLedger) { + defer wg.Done() + ledger.WaitForConsensus() + }(l) + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + timer := time.NewTimer(timeout) + select { + case <-done: + return + + case <-timer.C: + t.Fatal("consensus round took too long") + } +} + +// loadKeys returns a keypair from a wallet string, or generates a new one +// if no wallet is provided. +func loadKeys(t testing.TB, wallet string) *skademlia.Keypair { + // Generate a keypair if wallet is empty + if wallet == "" { + keys, err := skademlia.NewKeys(sys.SKademliaC1, sys.SKademliaC2) + assert.NoError(t, err) + return keys + } + + if len(wallet) != hex.EncodedLen(edwards25519.SizePrivateKey) { + t.Fatal(fmt.Errorf("private key is not of the right length")) + } + + var privateKey edwards25519.PrivateKey + n, err := hex.Decode(privateKey[:], []byte(wallet)) + if err != nil { + t.Fatal(err) + } + + if n != edwards25519.SizePrivateKey { + t.Fatal(fmt.Errorf("private key is not of the right length")) + } + + keys, err := skademlia.LoadKeys(privateKey, sys.SKademliaC1, sys.SKademliaC2) + if err != nil { + t.Fatal(err) + } + + return keys +} diff --git a/utils_test.go b/utils_test.go index 4d0d8a30..c1f6cb15 100644 --- a/utils_test.go +++ b/utils_test.go @@ -91,7 +91,7 @@ func newNode(t *testing.T) (*skademlia.Client, string, func()) { client.SetCredentials(noise.NewCredentials(addr, handshake.NewECDH(), cipher.NewAEAD(), client.Protocol())) kv, cleanup := store.NewTestKV(t, "inmem", "db") - ledger := NewLedger(kv, client, nil) + ledger := NewLedger(kv, client, WithoutGC()) server := client.Listen() RegisterWaveletServer(server, ledger.Protocol())