Skip to content

Commit

Permalink
Merge pull request #3991 from filecoin-project/feat/nicer-syncwait
Browse files Browse the repository at this point in the history
Make sync wait nicer
  • Loading branch information
magik6k authored Sep 24, 2020
2 parents cdda66a + 15eddf0 commit 85caa48
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 4 deletions.
2 changes: 2 additions & 0 deletions api/api_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,8 @@ type ActiveSync struct {

type SyncState struct {
ActiveSyncs []ActiveSync

VMApplied uint64
}

type SyncStateStage int
Expand Down
11 changes: 11 additions & 0 deletions chain/vm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"reflect"
"sync/atomic"
"time"

block "github.com/ipfs/go-block-format"
Expand Down Expand Up @@ -40,6 +41,12 @@ var log = logging.Logger("vm")
var actorLog = logging.Logger("actors")
var gasOnActorExec = newGasCharge("OnActorExec", 0, 0)

// stat counters
var (
StatSends uint64
StatApplied uint64
)

// ResolveToKeyAddr returns the public key type of address (`BLS`/`SECP256K1`) of an account actor identified by `addr`.
func ResolveToKeyAddr(state types.StateTree, cst cbor.IpldStore, addr address.Address) (address.Address, error) {
if addr.Protocol() == address.BLS || addr.Protocol() == address.SECP256K1 {
Expand Down Expand Up @@ -204,6 +211,8 @@ type ApplyRet struct {
func (vm *VM) send(ctx context.Context, msg *types.Message, parent *Runtime,
gasCharge *GasCharge, start time.Time) ([]byte, aerrors.ActorError, *Runtime) {

defer atomic.AddUint64(&StatSends, 1)

st := vm.cstate

origin := msg.From
Expand Down Expand Up @@ -312,6 +321,7 @@ func checkMessage(msg *types.Message) error {

func (vm *VM) ApplyImplicitMessage(ctx context.Context, msg *types.Message) (*ApplyRet, error) {
start := build.Clock.Now()
defer atomic.AddUint64(&StatApplied, 1)
ret, actorErr, rt := vm.send(ctx, msg, nil, nil, start)
rt.finilizeGasTracing()
return &ApplyRet{
Expand All @@ -331,6 +341,7 @@ func (vm *VM) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet,
start := build.Clock.Now()
ctx, span := trace.StartSpan(ctx, "vm.ApplyMessage")
defer span.End()
defer atomic.AddUint64(&StatApplied, 1)
msg := cmsg.VMMessage()
if span.IsRecordingEvents() {
span.AddAttributes(
Expand Down
33 changes: 31 additions & 2 deletions cli/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,16 @@ var syncCheckpointCmd = &cli.Command{
}

func SyncWait(ctx context.Context, napi api.FullNode) error {
tick := time.Second / 4

lastLines := 0
ticker := time.NewTicker(tick)
defer ticker.Stop()

samples := 8
i := 0
var app, lastApp uint64

for {
state, err := napi.SyncState(ctx)
if err != nil {
Expand Down Expand Up @@ -266,7 +276,24 @@ func SyncWait(ctx context.Context, napi api.FullNode) error {
heightDiff = 0
}

fmt.Printf("\r\x1b[2KWorker %d: Base Height: %d\tTarget Height: %d\t Height diff: %d\tTarget: %s\tState: %s\tHeight: %d", working, baseHeight, theight, heightDiff, target, ss.Stage, ss.Height)
for i := 0; i < lastLines; i++ {
fmt.Print("\r\x1b[2K\x1b[A")
}

fmt.Printf("Worker: %d; Base: %d; Target: %d (diff: %d)\n", working, baseHeight, theight, heightDiff)
fmt.Printf("State: %s; Current Epoch: %d; Todo: %d\n", ss.Stage, ss.Height, theight-ss.Height)
lastLines = 2

if i%samples == 0 {
lastApp = app
app = state.VMApplied
}
if i > 0 {
fmt.Printf("Validated %d messages (%d per second)\n", state.VMApplied, (app-lastApp)*uint64(time.Second/tick)/uint64(samples))
lastLines++
}

_ = target // todo: maybe print? (creates a bunch of line wrapping issues with most tipsets)

if time.Now().Unix()-int64(head.MinTimestamp()) < int64(build.BlockDelaySecs) {
fmt.Println("\nDone!")
Expand All @@ -277,7 +304,9 @@ func SyncWait(ctx context.Context, napi api.FullNode) error {
case <-ctx.Done():
fmt.Println("\nExit by user")
return nil
case <-build.Clock.After(1 * time.Second):
case <-ticker.C:
}

i++
}
}
3 changes: 2 additions & 1 deletion documentation/en/api-methods.md
Original file line number Diff line number Diff line change
Expand Up @@ -4303,7 +4303,8 @@ Inputs: `null`
Response:
```json
{
"ActiveSyncs": null
"ActiveSyncs": null,
"VMApplied": 42
}
```

Expand Down
6 changes: 5 additions & 1 deletion node/impl/full/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package full

import (
"context"
"sync/atomic"

cid "github.com/ipfs/go-cid"
pubsub "github.com/libp2p/go-libp2p-pubsub"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/gen/slashfilter"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/node/modules/dtypes"
)

Expand All @@ -28,7 +30,9 @@ type SyncAPI struct {
func (a *SyncAPI) SyncState(ctx context.Context) (*api.SyncState, error) {
states := a.Syncer.State()

out := &api.SyncState{}
out := &api.SyncState{
VMApplied: atomic.LoadUint64(&vm.StatApplied),
}

for i := range states {
ss := &states[i]
Expand Down

0 comments on commit 85caa48

Please sign in to comment.