Skip to content

Commit

Permalink
Merge pull request #274 from ethstorage/priv-dashboard-sync
Browse files Browse the repository at this point in the history
dashboard part 1: add sync state and collect data for network dashboard
  • Loading branch information
ping-ke authored Apr 22, 2024
2 parents 9f055e9 + 49c0987 commit a1de6ea
Show file tree
Hide file tree
Showing 7 changed files with 270 additions and 145 deletions.
5 changes: 4 additions & 1 deletion ethstorage/p2p/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.EsConfig,
}
}
go n.syncCl.ReportPeerSummary()
n.syncSrv = protocol.NewSyncServer(rollupCfg, storageManager, m)
n.syncSrv = protocol.NewSyncServer(rollupCfg, storageManager, db, m)

blobByRangeHandler := protocol.MakeStreamHandler(resourcesCtx, log.New("serve", "blobs_by_range"), n.syncSrv.HandleGetBlobsByRangeRequest)
n.host.SetStreamHandler(protocol.GetProtocolID(protocol.RequestBlobsByRangeProtocolID, rollupCfg.L2ChainID), blobByRangeHandler)
Expand Down Expand Up @@ -259,6 +259,9 @@ func (n *NodeP2P) Close() error {
result = multierror.Append(result, fmt.Errorf("failed to close p2p sync client cleanly: %w", err))
}
}
if n.syncSrv != nil {
n.syncSrv.Close()
}
}
return result.ErrorOrNil()
}
Expand Down
51 changes: 31 additions & 20 deletions ethstorage/p2p/protocol/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,10 +371,10 @@ func createLocalHostAndSyncClient(t *testing.T, testLog log.Logger, rollupCfg *r
}

func createRemoteHost(t *testing.T, ctx context.Context, rollupCfg *rollup.EsConfig,
storageManager *mockStorageManagerReader, metrics SyncServerMetrics, testLog log.Logger) host.Host {
storageManager *mockStorageManagerReader, db ethdb.Database, metrics SyncServerMetrics, testLog log.Logger) host.Host {

remoteHost := getNetHost(t)
syncSrv := NewSyncServer(rollupCfg, storageManager, metrics)
syncSrv := NewSyncServer(rollupCfg, storageManager, db, metrics)
blobByRangeHandler := MakeStreamHandler(ctx, testLog, syncSrv.HandleGetBlobsByRangeRequest)
remoteHost.SetStreamHandler(GetProtocolID(RequestBlobsByRangeProtocolID, rollupCfg.L2ChainID), blobByRangeHandler)
blobByListHandler := MakeStreamHandler(ctx, testLog, syncSrv.HandleGetBlobsByListRequest)
Expand Down Expand Up @@ -563,7 +563,7 @@ func TestSync_RequestL2Range(t *testing.T) {
t.Fatal("Download blob metadata failed", "error", err)
return
}
remoteHost := createRemoteHost(t, ctx, rollupCfg, smr, m, testLog)
remoteHost := createRemoteHost(t, ctx, rollupCfg, smr, db, m, testLog)
connect(t, localHost, remoteHost, shards, shards)

time.Sleep(2 * time.Second)
Expand Down Expand Up @@ -635,7 +635,7 @@ func TestSync_RequestL2List(t *testing.T) {
t.Fatal("Download blob metadata failed", "error", err)
return
}
remoteHost := createRemoteHost(t, ctx, rollupCfg, smr, m, testLog)
remoteHost := createRemoteHost(t, ctx, rollupCfg, smr, db, m, testLog)
connect(t, localHost, remoteHost, shards, shards)

indexes := make([]uint64, 0)
Expand Down Expand Up @@ -686,18 +686,20 @@ func TestSaveAndLoadSyncStatus(t *testing.T) {
syncCl.tasks[0].healTask.insert(indexes)
syncCl.tasks[0].SubTasks[0].First = 1
syncCl.tasks[0].SubTasks[0].next = 33
syncCl.tasks[0].state.BlobsSynced = 30
syncCl.tasks[0].state.SyncedSeconds = expectedSecondsUsed
syncCl.tasks[1].SubTasks = make([]*subTask, 0)
syncCl.tasks[1].state.BlobsSynced = entries
syncCl.tasks[1].state.SyncedSeconds = expectedSecondsUsed

tasks := syncCl.tasks
syncCl.cleanTasks()
if !syncCl.tasks[1].done {
t.Fatalf("task 1 should be done.")
}
syncCl.totalSecondsUsed = expectedSecondsUsed
syncCl.saveSyncStatus(true)

syncCl.tasks = make([]*task, 0)
syncCl.totalSecondsUsed = 0
syncCl.loadSyncStatus()
tasks[0].healTask.Indexes = make(map[uint64]int64)
tasks[0].SubTasks[0].First = 5
Expand All @@ -706,8 +708,17 @@ func TestSaveAndLoadSyncStatus(t *testing.T) {
if err := compareTasks(tasks, syncCl.tasks); err != nil {
t.Fatalf("compare kv task fail. err: %s", err.Error())
}
if syncCl.totalSecondsUsed != expectedSecondsUsed {
t.Fatalf("compare totalSecondsUsed fail, expect")
if syncCl.tasks[0].state.BlobsSynced != 30 {
t.Fatalf("compare BlobsSynced fail, expect %d, real %d", 30, syncCl.tasks[0].state.BlobsSynced)
}
if syncCl.tasks[0].state.SyncedSeconds != expectedSecondsUsed {
t.Fatalf("compare totalSecondsUsed fail, expect %d, real %d", expectedSecondsUsed, syncCl.tasks[0].state.SyncedSeconds)
}
if syncCl.tasks[1].state.BlobsSynced != entries {
t.Fatalf("compare BlobsSynced fail, expect %d, real %d", entries, syncCl.tasks[1].state.BlobsSynced)
}
if syncCl.tasks[1].state.SyncedSeconds != expectedSecondsUsed {
t.Fatalf("compare totalSecondsUsed fail, expect %d, real %d", expectedSecondsUsed, syncCl.tasks[1].state.SyncedSeconds)
}
}

Expand Down Expand Up @@ -812,7 +823,7 @@ func testSync(t *testing.T, chunkSize, kvSize, kvEntries uint64, localShards []u
}
rShardMap := make(map[common.Address][]uint64)
rShardMap[contract] = rPeer.shards
remoteHost := createRemoteHost(t, ctx, rollupCfg, smr, m, testLog)
remoteHost := createRemoteHost(t, ctx, rollupCfg, smr, db, m, testLog)
connect(t, localHost, remoteHost, localShardMap, rShardMap)
}

Expand All @@ -837,7 +848,7 @@ func TestSimpleSync(t *testing.T) {
excludedList: make(map[uint64]struct{}),
}}

testSync(t, defaultChunkSize, kvSize, kvEntries, []uint64{0}, lastKvIndex, defaultEncodeType, 3, remotePeers, true)
testSync(t, defaultChunkSize, kvSize, kvEntries, []uint64{0}, lastKvIndex, defaultEncodeType, 4, remotePeers, true)
}

// TestMultiSubTasksSync test sync process with local node support a single big (its task contains multi subTask) shard
Expand Down Expand Up @@ -1033,7 +1044,7 @@ func TestAddPeerDuringSyncing(t *testing.T) {
shardMiner: common.Address{},
blobPayloads: pData,
}
remoteHost0 := createRemoteHost(t, ctx, rollupCfg, smr0, m, testLog)
remoteHost0 := createRemoteHost(t, ctx, rollupCfg, smr0, db, m, testLog)
connect(t, localHost, remoteHost0, shardMap, shardMap)
time.Sleep(2 * time.Second)

Expand All @@ -1051,7 +1062,7 @@ func TestAddPeerDuringSyncing(t *testing.T) {
shardMiner: common.Address{},
blobPayloads: data[contract],
}
remoteHost1 := createRemoteHost(t, ctx, rollupCfg, smr1, m, testLog)
remoteHost1 := createRemoteHost(t, ctx, rollupCfg, smr1, db, m, testLog)
connect(t, localHost, remoteHost1, shardMap, shardMap)
checkStall(t, 3, mux, cancel)

Expand Down Expand Up @@ -1104,7 +1115,7 @@ func TestCloseSyncWhileFillEmpty(t *testing.T) {
time.Sleep(10 * time.Millisecond)
syncCl.Close()

t.Log("Fill empty status", "filled", syncCl.emptyBlobsFilled, "toFill", syncCl.emptyBlobsToFill)
t.Log("Fill empty status", "filled", syncCl.tasks[0].state.EmptyFilled, "toFill", syncCl.tasks[0].state.EmptyToFill)
if syncCl.syncDone {
t.Fatalf("fill empty should be cancel")
}
Expand Down Expand Up @@ -1169,7 +1180,7 @@ func TestAddPeerAfterSyncDone(t *testing.T) {
shardMiner: common.Address{},
blobPayloads: data[contract],
}
remoteHost0 := createRemoteHost(t, ctx, rollupCfg, smr0, m, testLog)
remoteHost0 := createRemoteHost(t, ctx, rollupCfg, smr0, db, m, testLog)
connect(t, localHost, remoteHost0, shardMap, shardMap)
checkStall(t, 3, mux, cancel)

Expand All @@ -1187,7 +1198,7 @@ func TestAddPeerAfterSyncDone(t *testing.T) {
shardMiner: common.Address{},
blobPayloads: data[contract],
}
remoteHost1 := createRemoteHost(t, ctx, rollupCfg, smr1, m, testLog)
remoteHost1 := createRemoteHost(t, ctx, rollupCfg, smr1, db, m, testLog)
connect(t, localHost, remoteHost1, shardMap, shardMap)

time.Sleep(10 * time.Millisecond)
Expand All @@ -1199,7 +1210,7 @@ func TestAddPeerAfterSyncDone(t *testing.T) {
func TestFillEmpty(t *testing.T) {
var (
kvSize = defaultChunkSize
kvEntries = uint64(512)
kvEntries = uint64(256)
lastKvIndex = uint64(12)
db = rawdb.NewMemoryDatabase()
mux = new(event.Feed)
Expand Down Expand Up @@ -1245,10 +1256,10 @@ func TestFillEmpty(t *testing.T) {
if len(syncCl.tasks[0].SubEmptyTasks) > 0 {
t.Fatalf("fill empty should be done")
}
if syncCl.emptyBlobsToFill != 0 {
t.Fatalf("emptyBlobsToFill should be 0, value %d", syncCl.emptyBlobsToFill)
if syncCl.tasks[0].state.EmptyToFill != 0 {
t.Fatalf("emptyBlobsToFill should be 0, value %d", syncCl.tasks[0].state.EmptyToFill)
}
if syncCl.emptyBlobsFilled != (kvEntries - lastKvIndex) {
t.Fatalf("emptyBlobsFilled is wrong, expect %d, value %d", kvEntries-lastKvIndex, syncCl.emptyBlobsFilled)
if syncCl.tasks[0].state.EmptyFilled != (kvEntries - lastKvIndex) {
t.Fatalf("emptyBlobsFilled is wrong, expect %d, value %d", kvEntries-lastKvIndex, syncCl.tasks[0].state.EmptyFilled)
}
}
Loading

0 comments on commit a1de6ea

Please sign in to comment.