diff --git a/ethstorage/p2p/node.go b/ethstorage/p2p/node.go index 7d2348a1..87f674c2 100644 --- a/ethstorage/p2p/node.go +++ b/ethstorage/p2p/node.go @@ -148,7 +148,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.EsConfig, } } go n.syncCl.ReportPeerSummary() - n.syncSrv = protocol.NewSyncServer(rollupCfg, storageManager, m) + n.syncSrv = protocol.NewSyncServer(rollupCfg, storageManager, db, m) blobByRangeHandler := protocol.MakeStreamHandler(resourcesCtx, log.New("serve", "blobs_by_range"), n.syncSrv.HandleGetBlobsByRangeRequest) n.host.SetStreamHandler(protocol.GetProtocolID(protocol.RequestBlobsByRangeProtocolID, rollupCfg.L2ChainID), blobByRangeHandler) @@ -259,6 +259,9 @@ func (n *NodeP2P) Close() error { result = multierror.Append(result, fmt.Errorf("failed to close p2p sync client cleanly: %w", err)) } } + if n.syncSrv != nil { + n.syncSrv.Close() + } } return result.ErrorOrNil() } diff --git a/ethstorage/p2p/protocol/sync_test.go b/ethstorage/p2p/protocol/sync_test.go index 9c8c51cf..85307e8f 100644 --- a/ethstorage/p2p/protocol/sync_test.go +++ b/ethstorage/p2p/protocol/sync_test.go @@ -371,10 +371,10 @@ func createLocalHostAndSyncClient(t *testing.T, testLog log.Logger, rollupCfg *r } func createRemoteHost(t *testing.T, ctx context.Context, rollupCfg *rollup.EsConfig, - storageManager *mockStorageManagerReader, metrics SyncServerMetrics, testLog log.Logger) host.Host { + storageManager *mockStorageManagerReader, db ethdb.Database, metrics SyncServerMetrics, testLog log.Logger) host.Host { remoteHost := getNetHost(t) - syncSrv := NewSyncServer(rollupCfg, storageManager, metrics) + syncSrv := NewSyncServer(rollupCfg, storageManager, db, metrics) blobByRangeHandler := MakeStreamHandler(ctx, testLog, syncSrv.HandleGetBlobsByRangeRequest) remoteHost.SetStreamHandler(GetProtocolID(RequestBlobsByRangeProtocolID, rollupCfg.L2ChainID), blobByRangeHandler) blobByListHandler := MakeStreamHandler(ctx, testLog, syncSrv.HandleGetBlobsByListRequest) @@ -563,7 +563,7 @@ func TestSync_RequestL2Range(t *testing.T) { t.Fatal("Download blob metadata failed", "error", err) return } - remoteHost := createRemoteHost(t, ctx, rollupCfg, smr, m, testLog) + remoteHost := createRemoteHost(t, ctx, rollupCfg, smr, db, m, testLog) connect(t, localHost, remoteHost, shards, shards) time.Sleep(2 * time.Second) @@ -635,7 +635,7 @@ func TestSync_RequestL2List(t *testing.T) { t.Fatal("Download blob metadata failed", "error", err) return } - remoteHost := createRemoteHost(t, ctx, rollupCfg, smr, m, testLog) + remoteHost := createRemoteHost(t, ctx, rollupCfg, smr, db, m, testLog) connect(t, localHost, remoteHost, shards, shards) indexes := make([]uint64, 0) @@ -686,18 +686,20 @@ func TestSaveAndLoadSyncStatus(t *testing.T) { syncCl.tasks[0].healTask.insert(indexes) syncCl.tasks[0].SubTasks[0].First = 1 syncCl.tasks[0].SubTasks[0].next = 33 + syncCl.tasks[0].state.BlobsSynced = 30 + syncCl.tasks[0].state.SyncedSeconds = expectedSecondsUsed syncCl.tasks[1].SubTasks = make([]*subTask, 0) + syncCl.tasks[1].state.BlobsSynced = entries + syncCl.tasks[1].state.SyncedSeconds = expectedSecondsUsed tasks := syncCl.tasks syncCl.cleanTasks() if !syncCl.tasks[1].done { t.Fatalf("task 1 should be done.") } - syncCl.totalSecondsUsed = expectedSecondsUsed syncCl.saveSyncStatus(true) syncCl.tasks = make([]*task, 0) - syncCl.totalSecondsUsed = 0 syncCl.loadSyncStatus() tasks[0].healTask.Indexes = make(map[uint64]int64) tasks[0].SubTasks[0].First = 5 @@ -706,8 +708,17 @@ func TestSaveAndLoadSyncStatus(t *testing.T) { if err := compareTasks(tasks, syncCl.tasks); err != nil { t.Fatalf("compare kv task fail. err: %s", err.Error()) } - if syncCl.totalSecondsUsed != expectedSecondsUsed { - t.Fatalf("compare totalSecondsUsed fail, expect") + if syncCl.tasks[0].state.BlobsSynced != 30 { + t.Fatalf("compare BlobsSynced fail, expect %d, real %d", 30, syncCl.tasks[0].state.BlobsSynced) + } + if syncCl.tasks[0].state.SyncedSeconds != expectedSecondsUsed { + t.Fatalf("compare totalSecondsUsed fail, expect %d, real %d", expectedSecondsUsed, syncCl.tasks[0].state.SyncedSeconds) + } + if syncCl.tasks[1].state.BlobsSynced != entries { + t.Fatalf("compare BlobsSynced fail, expect %d, real %d", entries, syncCl.tasks[1].state.BlobsSynced) + } + if syncCl.tasks[1].state.SyncedSeconds != expectedSecondsUsed { + t.Fatalf("compare totalSecondsUsed fail, expect %d, real %d", expectedSecondsUsed, syncCl.tasks[1].state.SyncedSeconds) } } @@ -812,7 +823,7 @@ func testSync(t *testing.T, chunkSize, kvSize, kvEntries uint64, localShards []u } rShardMap := make(map[common.Address][]uint64) rShardMap[contract] = rPeer.shards - remoteHost := createRemoteHost(t, ctx, rollupCfg, smr, m, testLog) + remoteHost := createRemoteHost(t, ctx, rollupCfg, smr, db, m, testLog) connect(t, localHost, remoteHost, localShardMap, rShardMap) } @@ -837,7 +848,7 @@ func TestSimpleSync(t *testing.T) { excludedList: make(map[uint64]struct{}), }} - testSync(t, defaultChunkSize, kvSize, kvEntries, []uint64{0}, lastKvIndex, defaultEncodeType, 3, remotePeers, true) + testSync(t, defaultChunkSize, kvSize, kvEntries, []uint64{0}, lastKvIndex, defaultEncodeType, 4, remotePeers, true) } // TestMultiSubTasksSync test sync process with local node support a single big (its task contains multi subTask) shard @@ -1033,7 +1044,7 @@ func TestAddPeerDuringSyncing(t *testing.T) { shardMiner: common.Address{}, blobPayloads: pData, } - remoteHost0 := createRemoteHost(t, ctx, rollupCfg, smr0, m, testLog) + remoteHost0 := createRemoteHost(t, ctx, rollupCfg, smr0, db, m, testLog) connect(t, localHost, remoteHost0, shardMap, shardMap) time.Sleep(2 * time.Second) @@ -1051,7 +1062,7 @@ func TestAddPeerDuringSyncing(t *testing.T) { shardMiner: common.Address{}, blobPayloads: data[contract], } - remoteHost1 := createRemoteHost(t, ctx, rollupCfg, smr1, m, testLog) + remoteHost1 := createRemoteHost(t, ctx, rollupCfg, smr1, db, m, testLog) connect(t, localHost, remoteHost1, shardMap, shardMap) checkStall(t, 3, mux, cancel) @@ -1104,7 +1115,7 @@ func TestCloseSyncWhileFillEmpty(t *testing.T) { time.Sleep(10 * time.Millisecond) syncCl.Close() - t.Log("Fill empty status", "filled", syncCl.emptyBlobsFilled, "toFill", syncCl.emptyBlobsToFill) + t.Log("Fill empty status", "filled", syncCl.tasks[0].state.EmptyFilled, "toFill", syncCl.tasks[0].state.EmptyToFill) if syncCl.syncDone { t.Fatalf("fill empty should be cancel") } @@ -1169,7 +1180,7 @@ func TestAddPeerAfterSyncDone(t *testing.T) { shardMiner: common.Address{}, blobPayloads: data[contract], } - remoteHost0 := createRemoteHost(t, ctx, rollupCfg, smr0, m, testLog) + remoteHost0 := createRemoteHost(t, ctx, rollupCfg, smr0, db, m, testLog) connect(t, localHost, remoteHost0, shardMap, shardMap) checkStall(t, 3, mux, cancel) @@ -1187,7 +1198,7 @@ func TestAddPeerAfterSyncDone(t *testing.T) { shardMiner: common.Address{}, blobPayloads: data[contract], } - remoteHost1 := createRemoteHost(t, ctx, rollupCfg, smr1, m, testLog) + remoteHost1 := createRemoteHost(t, ctx, rollupCfg, smr1, db, m, testLog) connect(t, localHost, remoteHost1, shardMap, shardMap) time.Sleep(10 * time.Millisecond) @@ -1199,7 +1210,7 @@ func TestAddPeerAfterSyncDone(t *testing.T) { func TestFillEmpty(t *testing.T) { var ( kvSize = defaultChunkSize - kvEntries = uint64(512) + kvEntries = uint64(256) lastKvIndex = uint64(12) db = rawdb.NewMemoryDatabase() mux = new(event.Feed) @@ -1245,10 +1256,10 @@ func TestFillEmpty(t *testing.T) { if len(syncCl.tasks[0].SubEmptyTasks) > 0 { t.Fatalf("fill empty should be done") } - if syncCl.emptyBlobsToFill != 0 { - t.Fatalf("emptyBlobsToFill should be 0, value %d", syncCl.emptyBlobsToFill) + if syncCl.tasks[0].state.EmptyToFill != 0 { + t.Fatalf("emptyBlobsToFill should be 0, value %d", syncCl.tasks[0].state.EmptyToFill) } - if syncCl.emptyBlobsFilled != (kvEntries - lastKvIndex) { - t.Fatalf("emptyBlobsFilled is wrong, expect %d, value %d", kvEntries-lastKvIndex, syncCl.emptyBlobsFilled) + if syncCl.tasks[0].state.EmptyFilled != (kvEntries - lastKvIndex) { + t.Fatalf("emptyBlobsFilled is wrong, expect %d, value %d", kvEntries-lastKvIndex, syncCl.tasks[0].state.EmptyFilled) } } diff --git a/ethstorage/p2p/protocol/syncclient.go b/ethstorage/p2p/protocol/syncclient.go index c8abf82a..871e2c8a 100644 --- a/ethstorage/p2p/protocol/syncclient.go +++ b/ethstorage/p2p/protocol/syncclient.go @@ -57,7 +57,8 @@ const ( var ( maxKvCountPerReq = uint64(16) - syncStatusKey = []byte("SyncStatus") + SyncStatusKey = []byte("SyncStatusKey") + SyncTasksKey = []byte("SyncStatus") // TODO this is the legacy value, change the value before next test net maxFillEmptyTaskTreads = 1 requestTimeoutInMillisecond = 1000 * time.Millisecond // Millisecond ) @@ -176,12 +177,6 @@ type SyncClient struct { logTime time.Time // Time instance when status was last reported saveTime time.Time // Time instance when state was last saved to DB storageManager StorageManager - - totalSecondsUsed uint64 - blobsSynced uint64 - syncedBytes common.StorageSize - emptyBlobsToFill uint64 - emptyBlobsFilled uint64 } func NewSyncClient(log log.Logger, cfg *rollup.EsConfig, newStream newStreamFn, storageManager StorageManager, params *SyncerParams, @@ -234,41 +229,39 @@ func (s *SyncClient) setSyncDone() { if s.mux != nil { s.mux.Send(EthStorageSyncDone{DoneType: AllShardDone}) } - log.Info("Sync done", "timeUsed", s.totalSecondsUsed) + log.Info("Sync done") } func (s *SyncClient) loadSyncStatus() { - // Start a fresh sync for retrieval. - s.blobsSynced, s.syncedBytes = 0, 0 - s.emptyBlobsToFill, s.emptyBlobsFilled = 0, 0 - s.totalSecondsUsed = 0 var progress SyncProgress - if status, _ := s.db.Get(syncStatusKey); status != nil { + if status, _ := s.db.Get(SyncTasksKey); status != nil { if err := json.Unmarshal(status, &progress); err != nil { log.Error("Failed to decode storage sync status", "err", err) } else { - for _, task := range progress.Tasks { - log.Debug("Load sync subTask", "contract", task.Contract.Hex(), - "shard", task.ShardId, "count", len(task.SubTasks)) - task.healTask = &healTask{ + for _, t := range progress.Tasks { + log.Debug("Load sync subTask", "contract", t.Contract.Hex(), + "shard", t.ShardId, "count", len(t.SubTasks)) + t.healTask = &healTask{ Indexes: make(map[uint64]int64), - task: task, + task: t, } - task.statelessPeers = make(map[peer.ID]struct{}) - task.peers = make(map[peer.ID]struct{}) - for _, sTask := range task.SubTasks { - sTask.task = task + t.statelessPeers = make(map[peer.ID]struct{}) + for _, sTask := range t.SubTasks { + sTask.task = t sTask.next = sTask.First } - for _, sEmptyTask := range task.SubEmptyTasks { - sEmptyTask.task = task - s.emptyBlobsToFill += sEmptyTask.Last - sEmptyTask.First + for _, sEmptyTask := range t.SubEmptyTasks { + sEmptyTask.task = t } } - s.blobsSynced, s.syncedBytes = progress.BlobsSynced, progress.SyncedBytes - s.emptyBlobsFilled = progress.EmptyBlobsFilled - s.totalSecondsUsed = progress.TotalSecondsUsed + } + } + + var states map[uint64]*SyncState + if status, _ := s.db.Get(SyncStatusKey); status != nil { + if err := json.Unmarshal(status, &states); err != nil { + log.Error("Failed to decode storage sync status", "err", err) } } @@ -276,9 +269,30 @@ func (s *SyncClient) loadSyncStatus() { lastKvIndex := s.storageManager.LastKvIndex() for _, sid := range s.storageManager.Shards() { exist := false - for _, task := range progress.Tasks { - if task.Contract == s.storageManager.ContractAddress() && task.ShardId == sid { - s.tasks = append(s.tasks, task) + for _, t := range progress.Tasks { + if t.Contract == s.storageManager.ContractAddress() && t.ShardId == sid { + if states != nil { + if state, ok := states[t.ShardId]; ok { + t.state = state + } + } + if t.state == nil { + // TODO if t.state is nil, that mean the status is marshal by old state, + // set process value to SyncState to make it compatible. + // it can be removed after public test done. + t.state = &SyncState{ + PeerCount: 0, + BlobsToSync: 0, + BlobsSynced: progress.BlobsSynced, + SyncProgress: 0, + SyncedSeconds: progress.TotalSecondsUsed, + EmptyFilled: progress.EmptyBlobsFilled, + EmptyToFill: 0, + FillEmptySeconds: progress.TotalSecondsUsed, + FillEmptyProgress: 0, + } + } + s.tasks = append(s.tasks, t) exist = true continue } @@ -287,8 +301,8 @@ func (s *SyncClient) loadSyncStatus() { continue } - task := s.createTask(sid, lastKvIndex) - s.tasks = append(s.tasks, task) + t := s.createTask(sid, lastKvIndex) + s.tasks = append(s.tasks, t) } } @@ -298,7 +312,17 @@ func (s *SyncClient) createTask(sid uint64, lastKvIndex uint64) *task { ShardId: sid, nextIdx: 0, statelessPeers: make(map[peer.ID]struct{}), - peers: make(map[peer.ID]struct{}), + state: &SyncState{ + PeerCount: 0, + BlobsToSync: 0, + BlobsSynced: 0, + SyncProgress: 0, + SyncedSeconds: 0, + EmptyFilled: 0, + EmptyToFill: 0, + FillEmptySeconds: 0, + FillEmptyProgress: 0, + }, } healTask := healTask{ @@ -343,7 +367,7 @@ func (s *SyncClient) createTask(sid uint64, lastKvIndex uint64) *task { subEmptyTasks := make([]*subEmptyTask, 0) if limitForEmpty > 0 { - s.emptyBlobsToFill += limitForEmpty - firstEmpty + task.state.EmptyToFill = limitForEmpty - firstEmpty maxEmptyTaskSize := (limitForEmpty - firstEmpty + uint64(maxFillEmptyTaskTreads) - 1) / uint64(maxFillEmptyTaskTreads) if maxEmptyTaskSize < minSubTaskSize { maxEmptyTaskSize = minSubTaskSize @@ -381,21 +405,35 @@ func (s *SyncClient) saveSyncStatus(force bool) { defer s.lock.Unlock() // Store the actual progress markers progress := &SyncProgress{ - Tasks: s.tasks, - BlobsSynced: s.blobsSynced, - SyncedBytes: s.syncedBytes, - EmptyBlobsToFill: s.emptyBlobsToFill, - EmptyBlobsFilled: s.emptyBlobsFilled, - TotalSecondsUsed: s.totalSecondsUsed, + Tasks: s.tasks, + // TODO remote it before next test net + BlobsSynced: 0, + SyncedBytes: 0, + EmptyBlobsToFill: 0, + EmptyBlobsFilled: 0, + TotalSecondsUsed: 0, } status, err := json.Marshal(progress) if err != nil { panic(err) // This can only fail during implementation } - if err := s.db.Put(syncStatusKey, status); err != nil { - log.Error("Failed to store sync status", "err", err) + if err := s.db.Put(SyncTasksKey, status); err != nil { + log.Error("Failed to store sync tasks", "err", err) } log.Debug("Save sync state to DB") + + // save sync states to DB for status reporting + states := make(map[uint64]*SyncState) + for _, t := range s.tasks { + states[t.ShardId] = t.state + } + status, err = json.Marshal(states) + if err != nil { + panic(err) // This can only fail during implementation + } + if err := s.db.Put(SyncStatusKey, status); err != nil { + log.Error("Failed to store sync states", "err", err) + } } // cleanTasks removes kv range retrieval tasks that have already been completed. @@ -438,8 +476,6 @@ func (s *SyncClient) cleanTasks() { if allDone { s.setSyncDone() log.Info("Storage sync done", "subTaskCount", len(s.tasks)) - - s.report(true) } } @@ -518,8 +554,8 @@ func (s *SyncClient) Close() error { s.resCancel() s.wg.Wait() s.cleanTasks() - s.saveSyncStatus(true) s.report(true) + s.saveSyncStatus(true) return nil } @@ -577,6 +613,7 @@ func (s *SyncClient) mainLoop() { // Remove all completed tasks and terminate sync if everything's done s.cleanTasks() if s.syncDone { + s.report(true) s.saveSyncStatus(true) return } @@ -825,12 +862,8 @@ func (s *SyncClient) assignFillEmptyBlobTasks() { filled := next - start s.lock.Lock() - s.emptyBlobsFilled += filled - if s.emptyBlobsToFill >= filled { - s.emptyBlobsToFill -= filled - } else { - s.emptyBlobsToFill = 0 - } + state := eTask.task.state + state.EmptyFilled += filled eTask.First = next if eTask.First >= eTask.Last { eTask.done = true @@ -906,8 +939,6 @@ func (s *SyncClient) OnBlobsByRange(res *blobsByRangeResponse) { return } - s.blobsSynced += synced - s.syncedBytes += common.StorageSize(syncedBytes) s.metrics.ClientOnBlobsByRange(req.peer.String(), reqCount, uint64(len(res.Blobs)), synced, time.Since(start)) log.Debug("Persisted set of kvs", "count", synced, "bytes", syncedBytes) @@ -934,6 +965,8 @@ func (s *SyncClient) OnBlobsByRange(res *blobsByRangeResponse) { } } s.lock.Lock() + state := req.subTask.task.state + state.BlobsSynced += uint64(len(inserted)) res.req.subTask.task.healTask.insert(missing) if last == res.req.subTask.Last-1 { res.req.subTask.done = true @@ -989,13 +1022,13 @@ func (s *SyncClient) OnBlobsByList(res *blobsByListResponse) { return } - s.blobsSynced += synced - s.syncedBytes += common.StorageSize(syncedBytes) s.metrics.ClientOnBlobsByList(req.peer.String(), uint64(len(req.indexes)), uint64(len(res.Blobs)), synced, time.Since(start)) log.Debug("Persisted set of kvs", "count", synced, "bytes", syncedBytes) s.lock.Lock() + state := req.healTask.task.state + state.BlobsSynced += uint64(len(inserted)) // set peer to stateless peer if fail too much if len(inserted) == 0 { if _, ok := s.peers[req.peer]; ok { @@ -1109,88 +1142,79 @@ func (s *SyncClient) commitBlobs(kvIndices []uint64, decodedBlobs [][]byte, comm // report calculates various status reports and provides it to the user. func (s *SyncClient) report(force bool) { + duration := uint64(time.Since(s.logTime).Seconds()) // Don't report all the events, just occasionally - if !force && time.Since(s.logTime) < 8*time.Second { + if !force && duration < 8 { return } - s.totalSecondsUsed = s.totalSecondsUsed + uint64(time.Since(s.logTime).Seconds()) s.logTime = time.Now() - s.reportSyncState() - s.reportFillEmptyState() -} + s.lock.Lock() + defer s.lock.Unlock() -func (s *SyncClient) reportSyncState() { - // Don't report anything until we have a meaningful progress - if s.blobsSynced == 0 { - return - } - var ( - totalSecondsUsed = s.totalSecondsUsed - synced = s.blobsSynced - syncedBytes = s.syncedBytes - blobsToSync = uint64(0) - taskRemain = 0 - subTaskRemain = 0 - ) + s.reportSyncState(duration) + s.reportFillEmptyState(duration) +} +func (s *SyncClient) reportSyncState(duration uint64) { for _, t := range s.tasks { + blobsToSync := uint64(0) for _, st := range t.SubTasks { blobsToSync = blobsToSync + (st.Last - st.next) - subTaskRemain++ } - blobsToSync = blobsToSync + uint64(t.healTask.count()) - if !t.done { - taskRemain++ + t.state.BlobsToSync = blobsToSync + uint64(t.healTask.count()) + if t.state.BlobsSynced+t.state.BlobsToSync != 0 { + t.state.SyncProgress = t.state.BlobsSynced * 10000 / (t.state.BlobsSynced + t.state.BlobsToSync) } - } - etaSecondsLeft := totalSecondsUsed * blobsToSync / synced + // If sync is complete, stop adding sync time + if t.state.BlobsToSync != 0 { + t.state.SyncedSeconds = t.state.SyncedSeconds + duration + } - // Create a mega progress report - var ( - progress = fmt.Sprintf("%.2f%%", float64(synced)*100/float64(blobsToSync+synced)) - tasksRemain = fmt.Sprintf("%d@%d", taskRemain, subTaskRemain) - blobsSynced = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(synced), syncedBytes.TerminalString()) - ) - log.Info("Storage sync in progress", "progress", progress, "peerCount", len(s.peers), "tasksRemain", tasksRemain, - "blobsSynced", blobsSynced, "blobsToSync", blobsToSync, "timeUsed", common.PrettyDuration(time.Duration(totalSecondsUsed)*time.Second), - "etaTimeLeft", common.PrettyDuration(time.Duration(etaSecondsLeft)*time.Second)) -} + estTime := "No estimated time" + progress := fmt.Sprintf("%.2f%%", float64(t.state.SyncProgress)/100) + if t.state.BlobsSynced != 0 { + etaSecondsLeft := t.state.SyncedSeconds * t.state.BlobsToSync / t.state.BlobsSynced + estTime = common.PrettyDuration(time.Duration(etaSecondsLeft) * time.Second).String() + } -func (s *SyncClient) reportFillEmptyState() { - // Don't report anything until we have a meaningful progress - if s.emptyBlobsFilled == 0 { - return + log.Info("Storage sync in progress", "shardId", t.ShardId, "subTaskRemain", len(t.SubTasks), "peerCount", + t.state.PeerCount, "progress", progress, "blobsSynced", t.state.BlobsSynced, "blobsToSync", t.state.BlobsToSync, + "timeUsed", common.PrettyDuration(time.Duration(t.state.SyncedSeconds)*time.Second), "etaTimeLeft", estTime) } +} - var ( - totalSecondsUsed = s.totalSecondsUsed - emptyFilled = s.emptyBlobsFilled - filledBytes = common.StorageSize(s.emptyBlobsFilled * s.storageManager.MaxKvSize()) - emptyToFill = s.emptyBlobsToFill - taskRemain = 0 - subFillTaskRemain = 0 - ) - +func (s *SyncClient) reportFillEmptyState(duration uint64) { for _, t := range s.tasks { - if !t.done { - taskRemain++ + if t.state.EmptyFilled == 0 && len(t.SubEmptyTasks) == 0 { + continue + } + emptyToFill := uint64(0) + for _, st := range t.SubEmptyTasks { + emptyToFill = emptyToFill + (st.Last - st.First) + } + t.state.EmptyToFill = emptyToFill + if t.state.EmptyFilled+t.state.EmptyToFill != 0 { + t.state.FillEmptyProgress = t.state.EmptyFilled * 10000 / (t.state.EmptyFilled + t.state.EmptyToFill) + } + + // If fill empty is complete, stop adding sync time + if t.state.EmptyToFill != 0 { + t.state.FillEmptySeconds = t.state.FillEmptySeconds + duration } - subFillTaskRemain = subFillTaskRemain + len(t.SubEmptyTasks) - } - etaSecondsLeft := totalSecondsUsed * emptyToFill / emptyFilled + estTime := "No estimated time" + progress := fmt.Sprintf("%.2f%%", float64(t.state.FillEmptyProgress)/100) + if t.state.EmptyFilled != 0 { + etaSecondsLeft := t.state.FillEmptySeconds * t.state.EmptyToFill / t.state.EmptyFilled + estTime = common.PrettyDuration(time.Duration(etaSecondsLeft) * time.Second).String() + } - // Create a mega progress report - var ( - progress = fmt.Sprintf("%.2f%%", float64(emptyFilled)*100/float64(emptyFilled+emptyToFill)) - tasksRemain = fmt.Sprintf("%d@%d", taskRemain, subFillTaskRemain) - blobsFilled = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(emptyFilled), filledBytes.TerminalString()) - ) - log.Info("Storage fill empty in progress", "progress", progress, "tasksRemain", tasksRemain, - "emptyFilled", blobsFilled, "emptyToFill", emptyToFill, "timeUsed", common.PrettyDuration(time.Duration(totalSecondsUsed)*time.Second), - "etaTimeLeft", common.PrettyDuration(time.Duration(etaSecondsLeft)*time.Second)) + log.Info("Storage fill empty in progress", "shardId", t.ShardId, "subTaskRemain", len(t.SubEmptyTasks), + "progress", progress, "emptyFilled", t.state.EmptyFilled, "emptyToFill", t.state.EmptyToFill, "timeUsed", + common.PrettyDuration(time.Duration(t.state.FillEmptySeconds)*time.Second), "etaTimeLeft", estTime) + } } func (s *SyncClient) ReportPeerSummary() { @@ -1215,7 +1239,6 @@ func (s *SyncClient) ReportPeerSummary() { return } } - } func (s *SyncClient) needThisPeer(contractShards map[common.Address][]uint64) bool { @@ -1233,7 +1256,7 @@ func (s *SyncClient) needThisPeer(contractShards map[common.Address][]uint64) bo // - SyncClient peer count smaller than maxPeers; or // - task peer count smaller than minPeersPerShard // otherwise, the peer will be disconnected. - if len(s.peers) < s.maxPeers || len(t.peers) < s.minPeersPerShard { + if len(s.peers) < s.maxPeers || t.state.PeerCount < s.minPeersPerShard { return true } } @@ -1248,7 +1271,7 @@ func (s *SyncClient) addPeerToTask(peerID peer.ID, contractShards map[common.Add for _, shard := range shards { for _, t := range s.tasks { if t.Contract == contract && shard == t.ShardId { - t.peers[peerID] = struct{}{} + t.state.PeerCount++ } } } @@ -1260,7 +1283,7 @@ func (s *SyncClient) removePeerFromTask(peerID peer.ID, contractShards map[commo for _, shard := range shards { for _, t := range s.tasks { if t.Contract == contract && shard == t.ShardId { - delete(t.peers, peerID) + t.state.PeerCount-- } } } diff --git a/ethstorage/p2p/protocol/syncserver.go b/ethstorage/p2p/protocol/syncserver.go index 51e9773b..55a3b79e 100644 --- a/ethstorage/p2p/protocol/syncserver.go +++ b/ethstorage/p2p/protocol/syncserver.go @@ -5,12 +5,14 @@ package protocol import ( "context" + "encoding/json" "fmt" "sync" "time" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" "github.com/ethstorage/go-ethstorage/ethstorage" @@ -43,6 +45,10 @@ const ( maxMessageSize = 8 * 1024 * 1024 ) +var ( + providedBlobsKey = []byte("ProvidedBlobsKey") +) + // peerStat maintains rate-limiting data of a peer that requests blocks from us. type peerStat struct { // Requests tokenizes each request to sync @@ -59,16 +65,21 @@ type SyncServerMetrics interface { type SyncServer struct { cfg *rollup.EsConfig + providedBlobs map[uint64]uint64 storageManager StorageManagerReader + db ethdb.Database metrics SyncServerMetrics + exitCh chan struct{} peerRateLimits *simplelru.LRU[peer.ID, *peerStat] peerStatsLock sync.Mutex globalRequestsRL *rate.Limiter + + lock sync.Mutex } -func NewSyncServer(cfg *rollup.EsConfig, storageManager StorageManagerReader, m SyncServerMetrics) *SyncServer { +func NewSyncServer(cfg *rollup.EsConfig, storageManager StorageManagerReader, db ethdb.Database, m SyncServerMetrics) *SyncServer { // We should never allow over 1000 different peers to churn through quickly, // so it's fine to prune rate-limit details past this. @@ -79,13 +90,35 @@ func NewSyncServer(cfg *rollup.EsConfig, storageManager StorageManagerReader, m if m == nil { m = metrics.NoopMetrics } - return &SyncServer{ + var providedBlobs map[uint64]uint64 + if status, _ := db.Get(providedBlobsKey); status != nil { + if err := json.Unmarshal(status, &providedBlobs); err != nil { + log.Error("Failed to decode provided blobs", "err", err) + } + } + + server := SyncServer{ cfg: cfg, storageManager: storageManager, + db: db, + providedBlobs: make(map[uint64]uint64), + exitCh: make(chan struct{}), metrics: m, peerRateLimits: peerRateLimits, globalRequestsRL: globalRequestsRL, } + + for _, shardId := range storageManager.Shards() { + if providedBlobs != nil { + if blobs, ok := providedBlobs[shardId]; ok { + server.providedBlobs[shardId] = blobs + continue + } + } + server.providedBlobs[shardId] = 0 + } + go server.SaveProvidedBlobs() + return &server } // HandleGetBlobsByRangeRequest is a stream handler function to register the L2 unsafe payloads alt-sync protocol. @@ -175,6 +208,9 @@ func (srv *SyncServer) handleGetBlobsByRangeRequest(ctx context.Context, stream } } srv.metrics.ServerReadBlobs(peerID.String(), read, sucRead, time.Since(start)) + srv.lock.Lock() + srv.providedBlobs[req.ShardId] += uint64(len(res.Blobs)) + srv.lock.Unlock() recordDur := srv.metrics.ServerRecordTimeUsed("encodeResult") data, err := rlp.EncodeToBytes(&res) @@ -227,6 +263,9 @@ func (srv *SyncServer) handleGetBlobsByListRequest(ctx context.Context, stream n } } srv.metrics.ServerReadBlobs(peerID.String(), read, sucRead, time.Since(start)) + srv.lock.Lock() + srv.providedBlobs[req.ShardId] += uint64(len(res.Blobs)) + srv.lock.Unlock() recordDur := srv.metrics.ServerRecordTimeUsed("encodeResult") data, err := rlp.EncodeToBytes(&res) @@ -311,3 +350,38 @@ func (srv *SyncServer) HandleRequestShardList(ctx context.Context, log log.Logge } log.Debug("Write response done for HandleRequestShardList") } + +func (srv *SyncServer) saveProvidedBlobs() { + srv.lock.Lock() + states, err := json.Marshal(srv.providedBlobs) + srv.lock.Unlock() + if err != nil { + log.Error("Failed to marshal provided blobs states", "err", err) + return + } + + err = srv.db.Put(providedBlobsKey, states) + if err != nil { + log.Error("Failed to store provided blobs states", "err", err) + return + } +} + +func (srv *SyncServer) SaveProvidedBlobs() { + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + for { + select { + case <-ticker.C: + srv.saveProvidedBlobs() + case <-srv.exitCh: + log.Info("Stopped P2P req-resp L2 block sync server") + return + } + } +} + +func (srv *SyncServer) Close() { + close(srv.exitCh) + srv.saveProvidedBlobs() +} diff --git a/ethstorage/p2p/protocol/task.go b/ethstorage/p2p/protocol/task.go index 661f27ba..3b49bde6 100644 --- a/ethstorage/p2p/protocol/task.go +++ b/ethstorage/p2p/protocol/task.go @@ -22,7 +22,7 @@ type task struct { // TODO: consider whether we need to retry those stateless peers or disconnect the peer statelessPeers map[peer.ID]struct{} // Peers that failed to deliver kv Data - peers map[peer.ID]struct{} + state *SyncState done bool // Flag whether the task has done } @@ -125,6 +125,7 @@ func (h *healTask) getBlobIndexesForRequest(batch uint64) []uint64 { type SyncProgress struct { Tasks []*task // The suspended kv tasks + // TODO keep it to make it compatible // Status report during syncing phase BlobsSynced uint64 // Number of kvs downloaded SyncedBytes common.StorageSize // Number of kv bytes downloaded diff --git a/ethstorage/p2p/protocol/types.go b/ethstorage/p2p/protocol/types.go index e7c075dd..cf3779af 100644 --- a/ethstorage/p2p/protocol/types.go +++ b/ethstorage/p2p/protocol/types.go @@ -141,3 +141,15 @@ type SyncerParams struct { FillEmptyConcurrency int MetaDownloadBatchSize uint64 } + +type SyncState struct { + PeerCount int `json:"peer_count"` + BlobsSynced uint64 `json:"blobs_synced"` + BlobsToSync uint64 `json:"blobs_to_sync"` + SyncProgress uint64 `json:"sync_progress"` + SyncedSeconds uint64 `json:"sync_seconds"` + EmptyFilled uint64 `json:"empty_filled"` + EmptyToFill uint64 `json:"empty_to_fill"` + FillEmptyProgress uint64 `json:"fill_empty_progress"` + FillEmptySeconds uint64 `json:"fill_empty_seconds"` +} diff --git a/ethstorage/storage_manager.go b/ethstorage/storage_manager.go index 8868477f..a109e86c 100644 --- a/ethstorage/storage_manager.go +++ b/ethstorage/storage_manager.go @@ -562,6 +562,7 @@ func (s *StorageManager) MaxKvSize() uint64 { func (s *StorageManager) MaxKvSizeBits() uint64 { return s.shardManager.kvSizeBits } + func (s *StorageManager) ChunksPerKvBits() uint64 { return s.shardManager.chunksPerKvBits }