Skip to content

Commit

Permalink
WIP: Raft consensus for lotus nodes in a cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
Shrenuj Bansal committed Sep 12, 2022
1 parent 2532300 commit 1dc9115
Show file tree
Hide file tree
Showing 26 changed files with 2,317 additions and 18 deletions.
5 changes: 5 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,11 @@ workflows:
suite: itest-paych_cli
target: "./itests/paych_cli_test.go"

- test:
name: test-itest-raft_messagesigner
suite: itest-raft_messagesigner
target: "./itests/raft_messagesigner_test.go"

- test:
name: test-itest-sdr_upgrade
suite: itest-sdr_upgrade
Expand Down
3 changes: 3 additions & 0 deletions api/api_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
consensus "github.com/libp2p/go-libp2p-consensus"
"github.com/libp2p/go-libp2p/core/peer"

"github.com/filecoin-project/go-address"
Expand Down Expand Up @@ -751,6 +752,8 @@ type FullNode interface {
// LOTUS_BACKUP_BASE_PATH environment variable set to some path, and that
// the path specified when calling CreateBackup is within the base path
CreateBackup(ctx context.Context, fpath string) error //perm:admin

RaftState(ctx context.Context) (consensus.State, error) //perm:read
}

type StorageAsk struct {
Expand Down
16 changes: 16 additions & 0 deletions api/mocks/mock_full.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions api/proxy_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ type MessageSendSpec struct {
MsgUuid uuid.UUID
}

type MpoolMessageWhole struct {
Msg *types.Message
Spec *MessageSendSpec
}

// GraphSyncDataTransfer provides diagnostics on a data transfer happening over graphsync
type GraphSyncDataTransfer struct {
// GraphSync request id for this transfer
Expand Down
Binary file modified build/openrpc/full.json.gz
Binary file not shown.
Binary file modified build/openrpc/gateway.json.gz
Binary file not shown.
Binary file modified build/openrpc/miner.json.gz
Binary file not shown.
Binary file modified build/openrpc/worker.json.gz
Binary file not shown.
47 changes: 37 additions & 10 deletions chain/messagesigner/messagesigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
logging "github.com/ipfs/go-log/v2"
consensus "github.com/libp2p/go-libp2p-consensus"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"

Expand All @@ -29,6 +30,18 @@ type MpoolNonceAPI interface {
GetActor(context.Context, address.Address, types.TipSetKey) (*types.Actor, error)
}

type MsgSigner interface {
SignMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, cb func(*types.SignedMessage) error) (*types.SignedMessage, error)
GetSignedMessage(ctx context.Context, uuid uuid.UUID) (*types.SignedMessage, error)
StoreSignedMessage(ctx context.Context, uuid uuid.UUID, message *types.SignedMessage) error
NextNonce(ctx context.Context, addr address.Address) (uint64, error)
SaveNonce(ctx context.Context, addr address.Address, nonce uint64) error
DstoreKey(addr address.Address) datastore.Key
IsLeader(ctx context.Context) bool
RedirectToLeader(ctx context.Context, method string, arg interface{}, ret interface{}) (bool, error)
GetRaftState(ctx context.Context) (consensus.State, error)
}

// MessageSigner keeps track of nonces per address, and increments the nonce
// when signing a message
type MessageSigner struct {
Expand All @@ -38,6 +51,8 @@ type MessageSigner struct {
ds datastore.Batching
}

//var _ full.MsgSigner = &MessageSigner{}

func NewMessageSigner(wallet api.Wallet, mpool MpoolNonceAPI, ds dtypes.MetadataDS) *MessageSigner {
ds = namespace.Wrap(ds, datastore.NewKey("/message-signer/"))
return &MessageSigner{
Expand All @@ -49,12 +64,12 @@ func NewMessageSigner(wallet api.Wallet, mpool MpoolNonceAPI, ds dtypes.Metadata

// SignMessage increments the nonce for the message From address, and signs
// the message
func (ms *MessageSigner) SignMessage(ctx context.Context, msg *types.Message, cb func(*types.SignedMessage) error) (*types.SignedMessage, error) {
func (ms *MessageSigner) SignMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, cb func(*types.SignedMessage) error) (*types.SignedMessage, error) {
ms.lk.Lock()
defer ms.lk.Unlock()

// Get the next message nonce
nonce, err := ms.nextNonce(ctx, msg.From)
nonce, err := ms.NextNonce(ctx, msg.From)
if err != nil {
return nil, xerrors.Errorf("failed to create nonce: %w", err)
}
Expand Down Expand Up @@ -86,7 +101,7 @@ func (ms *MessageSigner) SignMessage(ctx context.Context, msg *types.Message, cb
}

// If the callback executed successfully, write the nonce to the datastore
if err := ms.saveNonce(ctx, msg.From, nonce); err != nil {
if err := ms.SaveNonce(ctx, msg.From, nonce); err != nil {
return nil, xerrors.Errorf("failed to save nonce: %w", err)
}

Expand All @@ -113,9 +128,9 @@ func (ms *MessageSigner) StoreSignedMessage(ctx context.Context, uuid uuid.UUID,
return ms.ds.Put(ctx, key, serializedMsg)
}

// nextNonce gets the next nonce for the given address.
// NextNonce gets the next nonce for the given address.
// If there is no nonce in the datastore, gets the nonce from the message pool.
func (ms *MessageSigner) nextNonce(ctx context.Context, addr address.Address) (uint64, error) {
func (ms *MessageSigner) NextNonce(ctx context.Context, addr address.Address) (uint64, error) {
// Nonces used to be created by the mempool and we need to support nodes
// that have mempool nonces, so first check the mempool for a nonce for
// this address. Note that the mempool returns the actor state's nonce
Expand All @@ -126,7 +141,7 @@ func (ms *MessageSigner) nextNonce(ctx context.Context, addr address.Address) (u
}

// Get the next nonce for this address from the datastore
addrNonceKey := ms.dstoreKey(addr)
addrNonceKey := ms.DstoreKey(addr)
dsNonceBytes, err := ms.ds.Get(ctx, addrNonceKey)

switch {
Expand Down Expand Up @@ -159,14 +174,14 @@ func (ms *MessageSigner) nextNonce(ctx context.Context, addr address.Address) (u
}
}

// saveNonce increments the nonce for this address and writes it to the
// SaveNonce increments the nonce for this address and writes it to the
// datastore
func (ms *MessageSigner) saveNonce(ctx context.Context, addr address.Address, nonce uint64) error {
func (ms *MessageSigner) SaveNonce(ctx context.Context, addr address.Address, nonce uint64) error {
// Increment the nonce
nonce++

// Write the nonce to the datastore
addrNonceKey := ms.dstoreKey(addr)
addrNonceKey := ms.DstoreKey(addr)
buf := bytes.Buffer{}
_, err := buf.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, nonce))
if err != nil {
Expand All @@ -179,6 +194,18 @@ func (ms *MessageSigner) saveNonce(ctx context.Context, addr address.Address, no
return nil
}

func (ms *MessageSigner) dstoreKey(addr address.Address) datastore.Key {
func (ms *MessageSigner) DstoreKey(addr address.Address) datastore.Key {
return datastore.KeyWithNamespaces([]string{dsKeyActorNonce, addr.String()})
}

func (ms *MessageSigner) IsLeader(ctx context.Context) bool {
return true
}

func (ms *MessageSigner) RedirectToLeader(ctx context.Context, method string, arg interface{}, ret interface{}) (bool, error) {
return false, xerrors.Errorf("Single node shouldn't have any redirects")
}

func (ms *MessageSigner) GetRaftState(ctx context.Context) (consensus.State, error) {
return nil, xerrors.Errorf("This is a non raft consensus message signer")
}
122 changes: 122 additions & 0 deletions chain/messagesigner/messagesigner_consensus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package messagesigner

import (
"context"
"fmt"
"reflect"

"github.com/google/uuid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
libp2pconsensus "github.com/libp2p/go-libp2p-consensus"
"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
consensus "github.com/filecoin-project/lotus/lib/consensus/raft"
"github.com/filecoin-project/lotus/node/modules/dtypes"
)

type MessageSignerConsensus struct {
//msgSigner MessageSigner
MsgSigner
consensus *consensus.Consensus
}

//var _ full.MsgSigner = &MessageSignerConsensus{}

func NewMessageSignerConsensus(
wallet api.Wallet,
mpool MpoolNonceAPI,
ds dtypes.MetadataDS,
consensus *consensus.Consensus) *MessageSignerConsensus {

ds = namespace.Wrap(ds, datastore.NewKey("/message-signer-consensus/"))
return &MessageSignerConsensus{
MsgSigner: &MessageSigner{
wallet: wallet,
mpool: mpool,
ds: ds,
},
consensus: consensus,
}
}

func (ms *MessageSignerConsensus) IsLeader(ctx context.Context) bool {
return ms.consensus.IsLeader(ctx)
}

func (ms *MessageSignerConsensus) RedirectToLeader(ctx context.Context, method string, arg interface{}, ret interface{}) (bool, error) {
ok, err := ms.consensus.RedirectToLeader(method, arg, ret.(*types.SignedMessage))
if err != nil {
return ok, err
}
return ok, nil
}

func (ms *MessageSignerConsensus) SignMessage(
ctx context.Context,
msg *types.Message,
spec *api.MessageSendSpec,
cb func(*types.SignedMessage) error) (*types.SignedMessage, error) {

signedMsg, err := ms.MsgSigner.SignMessage(ctx, msg, spec, cb)
if err != nil {
return nil, err
}
//u := uuid.New()
//if spec != nil {
// u = spec.MsgUuid
//}

//leader, err := ms.consensus.Leader(ctx)
////curr := ms.consensus.IsLeader(ctx)
//log.Infof("Consensus leader: ", leader, "current node is leader: ", ms.consensus.IsLeader(ctx))

op := &consensus.ConsensusOp{signedMsg.Message.Nonce, spec.MsgUuid, signedMsg.Message.From, signedMsg}
err = ms.consensus.Commit(ctx, op)
if err != nil {
return nil, err
}

return signedMsg, nil
}

func (ms *MessageSignerConsensus) GetSignedMessage(ctx context.Context, uuid uuid.UUID) (*types.SignedMessage, error) {
state, err := ms.consensus.State(ctx)
if err != nil {
return nil, err
}

log.Infof("!!!!!!!!!!!!!!!!!!!!!!!STate type: %v", reflect.TypeOf(state))

cstate := state.(consensus.RaftState)
msg, ok := cstate.MsgUuids[uuid]
if !ok {
return nil, xerrors.Errorf("Msg with Uuid %s not available", uuid)
}
return msg, nil
}

func (ms *MessageSignerConsensus) GetRaftState(ctx context.Context) (libp2pconsensus.State, error) {
fmt.Println("Gets to message signer consensus raft state")
return ms.consensus.State(ctx)
}

//func (ms *MessageSignerConsensus) StoreSignedMessage(ctx context.Context, uuid uuid.UUID, message *types.SignedMessage) error {
//
// ms.consensus
// return ms.MsgSigner.StoreSignedMessage(ctx, uuid, message)
//}
//
//func (ms *MessageSignerConsensus) NextNonce(ctx context.Context, addr address.Address) (uint64, error) {
// return ms.msgSigner.NextNonce(ctx, addr)
//}
//
//func (ms *MessageSignerConsensus) SaveNonce(ctx context.Context, addr address.Address, nonce uint64) error {
// return ms.msgSigner.SaveNonce(ctx, addr, nonce)
//}
//
//func (ms *MessageSignerConsensus) DstoreKey(addr address.Address) datastore.Key {
// return ms.msgSigner.DstoreKey(addr)
//}
14 changes: 14 additions & 0 deletions documentation/en/api-v1-unstable-methods.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@
* [PaychVoucherCreate](#PaychVoucherCreate)
* [PaychVoucherList](#PaychVoucherList)
* [PaychVoucherSubmit](#PaychVoucherSubmit)
* [Raft](#Raft)
* [RaftState](#RaftState)
* [State](#State)
* [StateAccountKey](#StateAccountKey)
* [StateActorCodeCIDs](#StateActorCodeCIDs)
Expand Down Expand Up @@ -5047,6 +5049,18 @@ Response:
}
```

## Raft


### RaftState


Perms: read

Inputs: `null`

Response: `{}`

## State
The State methods are used to query, inspect, and interact with chain state.
Most methods take a TipSetKey as a parameter. The state looked up is the parent state of the tipset.
Expand Down
Loading

0 comments on commit 1dc9115

Please sign in to comment.