Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raft consensus for lotus nodes in a cluster #9294

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
8f1b1bb
WIP: Raft consensus for lotus nodes in a cluster
Sep 8, 2022
a1f2fdb
Merge branch 'master' into sbansal/nonce-coordination-and-consensus-f…
Sep 12, 2022
4171be0
Few more changes
Sep 12, 2022
3441224
WIP: rest of the stuff
Sep 13, 2022
4be8861
Merge branch 'master' into sbansal/nonce-coordination-and-consensus-f…
Sep 13, 2022
81c729e
Cluster raft config changes
Sep 13, 2022
1fe4aa3
Add Auth func for gorpc and address comments
Sep 21, 2022
7470549
Address moar comments
Sep 22, 2022
99e7c32
More wip
Sep 27, 2022
559c2c6
Merge branch 'master' into sbansal/nonce-coordination-and-consensus-f…
Sep 27, 2022
570f614
Retries within proxy working
Sep 28, 2022
f89a682
Add Mpool ref to raft state and rearrange some APIs
Sep 29, 2022
b8060cd
Add persistent stores for cluster raft data
Sep 29, 2022
986c5e3
Use multiaddrs in config for raft peerset
Sep 30, 2022
dde204f
Change Mpool push API to have an option to publish
Oct 4, 2022
9848182
solution for mining loop hitting the same node
Oct 5, 2022
17a7722
Ignore mpool msg existing errors for applying raft state
Oct 6, 2022
139f877
fix some bugs and address some comments
Oct 17, 2022
b77ca54
Change cli cmd to API with proxy
Oct 17, 2022
900525f
some cleanup
Oct 17, 2022
674427a
fix lint and make gen
Oct 18, 2022
15ed1ee
Merge branch 'master' into sbansal/nonce-coordination-and-consensus-f…
Oct 18, 2022
ad8b959
Address more comments and add test for gorpc auth
Oct 18, 2022
94bd4d8
make gen
Oct 18, 2022
09e9562
i hate make gen
Oct 18, 2022
2fa21ff
Merge branch 'master' into sbansal/nonce-coordination-and-consensus-f…
Nov 11, 2022
2681c2a
Change config name from Raft to Cluster
Nov 11, 2022
8740fb4
remove 2nd rpc closer call
Nov 11, 2022
b541cf9
Remove double stop
Nov 14, 2022
800d9de
Address comments
Nov 14, 2022
a66619f
update filecoin-ffi
Nov 14, 2022
ab1eeeb
one more
Nov 14, 2022
f14a25a
make gen and docsgen
Nov 14, 2022
b95d1a6
Merge branch 'master' into sbansal/nonce-coordination-and-consensus-f…
Nov 14, 2022
4b11b45
remove comments
Nov 15, 2022
9451221
remove moar commented out code
Nov 15, 2022
c0925ff
Remove some configs
Nov 15, 2022
22f3fbb
Add comment to Push API
Nov 15, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,11 @@ workflows:
suite: itest-mpool_msg_uuid
target: "./itests/mpool_msg_uuid_test.go"

- test:
name: test-itest-mpool_push_with_uuid
suite: itest-mpool_push_with_uuid
target: "./itests/mpool_push_with_uuid_test.go"

- test:
name: test-itest-multisig
suite: itest-multisig
Expand Down Expand Up @@ -955,6 +960,11 @@ workflows:
suite: itest-pending_deal_allocation
target: "./itests/pending_deal_allocation_test.go"

- test:
name: test-itest-raft_messagesigner
suite: itest-raft_messagesigner
target: "./itests/raft_messagesigner_test.go"

- test:
name: test-itest-remove_verifreg_datacap
suite: itest-remove_verifreg_datacap
Expand Down
3 changes: 3 additions & 0 deletions api/api_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,9 @@ type FullNode interface {
// LOTUS_BACKUP_BASE_PATH environment variable set to some path, and that
// the path specified when calling CreateBackup is within the base path
CreateBackup(ctx context.Context, fpath string) error //perm:admin

RaftState(ctx context.Context) (*RaftStateData, error) //perm:read
RaftLeader(ctx context.Context) (peer.ID, error) //perm:read
}

type StorageAsk struct {
Expand Down
4 changes: 4 additions & 0 deletions api/docgen/docgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,10 @@ func init() {
addExample(map[string]bitfield.BitField{
"": bitfield.NewFromSet([]uint64{5, 6, 7, 10}),
})
addExample(&api.RaftStateData{
NonceMap: make(map[address.Address]uint64),
MsgUuids: make(map[uuid.UUID]*types.SignedMessage),
})

addExample(http.Header{
"Authorization": []string{"Bearer ey.."},
Expand Down
30 changes: 30 additions & 0 deletions api/mocks/mock_full.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions api/proxy_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 63 additions & 0 deletions api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ type MessageSendSpec struct {
MsgUuid uuid.UUID
}

type MpoolMessageWhole struct {
Msg *types.Message
Spec *MessageSendSpec
}

// GraphSyncDataTransfer provides diagnostics on a data transfer happening over graphsync
type GraphSyncDataTransfer struct {
// GraphSync request id for this transfer
Expand Down Expand Up @@ -334,3 +339,61 @@ type ForkUpgradeParams struct {
UpgradeSkyrHeight abi.ChainEpoch
UpgradeSharkHeight abi.ChainEpoch
}

type NonceMapType map[address.Address]uint64
type MsgUuidMapType map[uuid.UUID]*types.SignedMessage
Comment on lines +343 to +344
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed to land this PR, but if we wanted to be fancy, we could write a generic helper type for maps with typed keys.


type RaftStateData struct {
NonceMap NonceMapType
MsgUuids MsgUuidMapType
}

func (n *NonceMapType) MarshalJSON() ([]byte, error) {
marshalled := make(map[string]uint64)
for a, n := range *n {
marshalled[a.String()] = n
}
return json.Marshal(marshalled)
}

func (n *NonceMapType) UnmarshalJSON(b []byte) error {
unmarshalled := make(map[string]uint64)
err := json.Unmarshal(b, &unmarshalled)
if err != nil {
return err
}
*n = make(map[address.Address]uint64)
for saddr, nonce := range unmarshalled {
a, err := address.NewFromString(saddr)
if err != nil {
return err
}
(*n)[a] = nonce
}
return nil
}

func (m *MsgUuidMapType) MarshalJSON() ([]byte, error) {
marshalled := make(map[string]*types.SignedMessage)
for u, msg := range *m {
marshalled[u.String()] = msg
}
return json.Marshal(marshalled)
}

func (m *MsgUuidMapType) UnmarshalJSON(b []byte) error {
unmarshalled := make(map[string]*types.SignedMessage)
err := json.Unmarshal(b, &unmarshalled)
if err != nil {
return err
}
*m = make(map[uuid.UUID]*types.SignedMessage)
for suid, msg := range unmarshalled {
u, err := uuid.Parse(suid)
if err != nil {
return err
}
(*m)[u] = msg
}
return nil
}
Binary file modified build/openrpc/full.json.gz
Binary file not shown.
Binary file modified build/openrpc/gateway.json.gz
Binary file not shown.
Binary file modified build/openrpc/miner.json.gz
Binary file not shown.
Binary file modified build/openrpc/worker.json.gz
Binary file not shown.
16 changes: 12 additions & 4 deletions chain/messagepool/messagepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ var (
ErrRBFTooLowPremium = errors.New("replace by fee has too low GasPremium")
ErrTooManyPendingMessages = errors.New("too many pending messages for actor")
ErrNonceGap = errors.New("unfulfilled nonce gap")
ErrExistingNonce = errors.New("message with nonce already exists")
)

const (
Expand Down Expand Up @@ -276,7 +277,7 @@ func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, strict, untrusted
}
} else {
return false, xerrors.Errorf("message from %s with nonce %d already in mpool: %w",
m.Message.From, m.Message.Nonce, ErrSoftValidationFailure)
m.Message.From, m.Message.Nonce, ErrExistingNonce)
}

ms.requiredFunds.Sub(ms.requiredFunds, exms.Message.RequiredFunds().Int)
Expand Down Expand Up @@ -667,7 +668,9 @@ func (mp *MessagePool) verifyMsgBeforeAdd(ctx context.Context, m *types.SignedMe
return publish, nil
}

func (mp *MessagePool) Push(ctx context.Context, m *types.SignedMessage) (cid.Cid, error) {
// Push checks the signed message for any violations, adds the message to the message pool and
// publishes the message if the publish flag is set
func (mp *MessagePool) Push(ctx context.Context, m *types.SignedMessage, publish bool) (cid.Cid, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a similar change for PushUntrusted. Is that intentional?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, could you leave a clear comment on what exactly this bool does?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't need it for PushUntrusted since its not used in syncing the message pool. I think this API should really be folded back into Push itself and add a untrusted param to it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I definitely agree :)

done := metrics.Timer(ctx, metrics.MpoolPushDuration)
defer done()

Expand All @@ -683,14 +686,14 @@ func (mp *MessagePool) Push(ctx context.Context, m *types.SignedMessage) (cid.Ci
}()

mp.curTsLk.Lock()
publish, err := mp.addTs(ctx, m, mp.curTs, true, false)
ok, err := mp.addTs(ctx, m, mp.curTs, true, false)
if err != nil {
mp.curTsLk.Unlock()
return cid.Undef, err
}
mp.curTsLk.Unlock()

if publish {
if ok && publish {
msgb, err := m.Serialize()
if err != nil {
return cid.Undef, xerrors.Errorf("error serializing message: %w", err)
Expand Down Expand Up @@ -1583,3 +1586,8 @@ func getBaseFeeLowerBound(baseFee, factor types.BigInt) types.BigInt {

return baseFeeLowerBound
}

type MpoolNonceAPI interface {
GetNonce(context.Context, address.Address, types.TipSetKey) (uint64, error)
GetActor(context.Context, address.Address, types.TipSetKey) (*types.Actor, error)
}
8 changes: 4 additions & 4 deletions chain/messagepool/messagepool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ func TestLoadLocal(t *testing.T) {
for i := 0; i < 10; i++ {
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
//stm: @CHAIN_MEMPOOL_PUSH_001
cid, err := mp.Push(context.TODO(), m)
cid, err := mp.Push(context.TODO(), m, true)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -618,7 +618,7 @@ func TestClearAll(t *testing.T) {
for i := 0; i < 10; i++ {
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
//stm: @CHAIN_MEMPOOL_PUSH_001
_, err := mp.Push(context.TODO(), m)
_, err := mp.Push(context.TODO(), m, true)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -676,7 +676,7 @@ func TestClearNonLocal(t *testing.T) {
for i := 0; i < 10; i++ {
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
//stm: @CHAIN_MEMPOOL_PUSH_001
_, err := mp.Push(context.TODO(), m)
_, err := mp.Push(context.TODO(), m, true)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -749,7 +749,7 @@ func TestUpdates(t *testing.T) {
for i := 0; i < 10; i++ {
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
//stm: @CHAIN_MEMPOOL_PUSH_001
_, err := mp.Push(context.TODO(), m)
_, err := mp.Push(context.TODO(), m, true)
if err != nil {
t.Fatal(err)
}
Expand Down
5 changes: 2 additions & 3 deletions chain/messagepool/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/network"

"github.com/filecoin-project/lotus/chain/messagesigner"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
Expand Down Expand Up @@ -42,7 +41,7 @@ type mpoolProvider struct {
sm *stmgr.StateManager
ps *pubsub.PubSub

lite messagesigner.MpoolNonceAPI
lite MpoolNonceAPI
}

var _ Provider = (*mpoolProvider)(nil)
Expand All @@ -51,7 +50,7 @@ func NewProvider(sm *stmgr.StateManager, ps *pubsub.PubSub) Provider {
return &mpoolProvider{sm: sm, ps: ps}
}

func NewProviderLite(sm *stmgr.StateManager, ps *pubsub.PubSub, noncer messagesigner.MpoolNonceAPI) Provider {
func NewProviderLite(sm *stmgr.StateManager, ps *pubsub.PubSub, noncer MpoolNonceAPI) Provider {
return &mpoolProvider{sm: sm, ps: ps, lite: noncer}
}

Expand Down
2 changes: 1 addition & 1 deletion chain/messagepool/repub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestRepubMessages(t *testing.T) {
for i := 0; i < 10; i++ {
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
//stm: @CHAIN_MEMPOOL_PUSH_001
_, err := mp.Push(context.TODO(), m)
_, err := mp.Push(context.TODO(), m, true)
if err != nil {
t.Fatal(err)
}
Expand Down
Loading