Skip to content

Commit

Permalink
temporary fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Francesco4203 committed Sep 6, 2024
1 parent 9336ce1 commit e65dbfa
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 48 deletions.
69 changes: 45 additions & 24 deletions eth/downloader/statesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
// a single data retrieval network packet.
type stateReq struct {
nItems uint16 // Number of items requested for download (max is 384, so uint16 is sufficient)
trieTasks map[common.Hash]*trieTask // Trie node download tasks to track previous attempts
trieTasks map[string]*trieTask // Trie node download tasks to track previous attempts
codeTasks map[common.Hash]*codeTask // Byte code download tasks to track previous attempts
timeout time.Duration // Maximum round trip time for this to complete
timer *time.Timer // Timer to fire when the RTT timeout expires
Expand Down Expand Up @@ -264,7 +264,7 @@ type stateSync struct {
sched *trie.Sync // State trie sync scheduler defining the tasks
keccak crypto.KeccakState // Keccak256 hasher to verify deliveries with

trieTasks map[common.Hash]*trieTask // Set of trie node tasks currently queued for retrieval
trieTasks map[string]*trieTask // Set of trie node tasks currently queued for retrieval
codeTasks map[common.Hash]*codeTask // Set of byte code tasks currently queued for retrieval

numUncommitted int
Expand All @@ -282,6 +282,7 @@ type stateSync struct {
// trieTask represents a single trie node download task, containing a set of
// peers already attempted retrieval from to detect stalled syncs and abort.
type trieTask struct {
hash common.Hash
path [][]byte
attempts map[string]struct{}
}
Expand All @@ -300,7 +301,7 @@ func newStateSync(d *Downloader, root common.Hash) *stateSync {
root: root,
sched: state.NewStateSync(root, d.stateDB, d.stateBloom, nil),
keccak: sha3.NewLegacyKeccak256().(crypto.KeccakState),
trieTasks: make(map[common.Hash]*trieTask),
trieTasks: make(map[string]*trieTask),
codeTasks: make(map[common.Hash]*codeTask),
deliver: make(chan *stateReq),
cancel: make(chan struct{}),
Expand Down Expand Up @@ -456,10 +457,11 @@ func (s *stateSync) assignTasks() {
func (s *stateSync) fillTasks(n int, req *stateReq) (nodes []common.Hash, paths []trie.SyncPath, codes []common.Hash) {
// Refill available tasks from the scheduler.
if fill := n - (len(s.trieTasks) + len(s.codeTasks)); fill > 0 {
nodes, paths, codes := s.sched.Missing(fill)
for i, hash := range nodes {
s.trieTasks[hash] = &trieTask{
path: paths[i],
paths, hashes, codes := s.sched.Missing(fill)
for i, path := range paths {
s.trieTasks[path] = &trieTask{
hash: hashes[i],
path: trie.NewSyncPath([]byte(path)),
attempts: make(map[string]struct{}),
}
}
Expand All @@ -475,7 +477,7 @@ func (s *stateSync) fillTasks(n int, req *stateReq) (nodes []common.Hash, paths
paths = make([]trie.SyncPath, 0, n)
codes = make([]common.Hash, 0, n)

req.trieTasks = make(map[common.Hash]*trieTask, n)
req.trieTasks = make(map[string]*trieTask, n)
req.codeTasks = make(map[common.Hash]*codeTask, n)

for hash, t := range s.codeTasks {
Expand All @@ -493,7 +495,7 @@ func (s *stateSync) fillTasks(n int, req *stateReq) (nodes []common.Hash, paths
req.codeTasks[hash] = t
delete(s.codeTasks, hash)
}
for hash, t := range s.trieTasks {
for path, t := range s.trieTasks {
// Stop when we've gathered enough requests
if len(nodes)+len(codes) == n {
break
Expand All @@ -505,11 +507,11 @@ func (s *stateSync) fillTasks(n int, req *stateReq) (nodes []common.Hash, paths
// Assign the request to this peer
t.attempts[req.peer.id] = struct{}{}

nodes = append(nodes, hash)
nodes = append(nodes, t.hash)
paths = append(paths, t.path)

req.trieTasks[hash] = t
delete(s.trieTasks, hash)
req.trieTasks[path] = t
delete(s.trieTasks, path)
}
req.nItems = uint16(len(nodes) + len(codes))
return nodes, paths, codes
Expand All @@ -531,7 +533,7 @@ func (s *stateSync) process(req *stateReq) (int, error) {

// Iterate over all the delivered data and inject one-by-one into the trie
for _, blob := range req.response {
hash, err := s.processNodeData(blob)
hash, err := s.processNodeData(req.trieTasks, req.codeTasks, blob)
switch err {
case nil:
s.numUncommitted++
Expand All @@ -544,13 +546,10 @@ func (s *stateSync) process(req *stateReq) (int, error) {
default:
return successful, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err)
}
// Delete from both queues (one delivery is enough for the syncer)
delete(req.trieTasks, hash)
delete(req.codeTasks, hash)
}
// Put unfulfilled tasks back into the retry queue
npeers := s.d.peers.Len()
for hash, task := range req.trieTasks {
for path, task := range req.trieTasks {
// If the node did deliver something, missing items may be due to a protocol
// limit or a previous timeout + delayed delivery. Both cases should permit
// the node to retry the missing items (to avoid single-peer stalls).
Expand All @@ -560,10 +559,10 @@ func (s *stateSync) process(req *stateReq) (int, error) {
// If we've requested the node too many times already, it may be a malicious
// sync where nobody has the right data. Abort.
if len(task.attempts) >= npeers {
return successful, fmt.Errorf("trie node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers)
return successful, fmt.Errorf("trie node %s failed with all peers (%d tries, %d peers)", task.hash.TerminalString(), len(task.attempts), npeers)
}
// Missing item, place into the retry queue.
s.trieTasks[hash] = task
s.trieTasks[path] = task
}
for hash, task := range req.codeTasks {
// If the node did deliver something, missing items may be due to a protocol
Expand All @@ -586,13 +585,35 @@ func (s *stateSync) process(req *stateReq) (int, error) {
// processNodeData tries to inject a trie node data blob delivered from a remote
// peer into the state trie, returning whether anything useful was written or any
// error occurred.
func (s *stateSync) processNodeData(blob []byte) (common.Hash, error) {
res := trie.SyncResult{Data: blob}
//
// If multiple requests correspond to the same hash, this method will inject the
// blob as a result for the first one only, leaving the remaining duplicates to
// be fetched again.
func (s *stateSync) processNodeData(nodeTasks map[string]*trieTask, codeTasks map[common.Hash]*codeTask, blob []byte) (common.Hash, error) {
var hash common.Hash
s.keccak.Reset()
s.keccak.Write(blob)
s.keccak.Read(res.Hash[:])
err := s.sched.Process(res)
return res.Hash, err
s.keccak.Read(hash[:])

if _, present := codeTasks[hash]; present {
err := s.sched.ProcessCode(trie.CodeSyncResult{
Hash: hash,
Data: blob,
})
delete(codeTasks, hash)
return hash, err
}
for path, task := range nodeTasks {
if task.hash == hash {
err := s.sched.ProcessNode(trie.NodeSyncResult{
Path: path,
Data: blob,
})
delete(nodeTasks, path)
return hash, err
}
}
return common.Hash{}, trie.ErrNotRequested
}

// updateStats bumps the various state sync progress counters and displays a log
Expand Down
69 changes: 45 additions & 24 deletions les/downloader/statesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
// a single data retrieval network packet.
type stateReq struct {
nItems uint16 // Number of items requested for download (max is 384, so uint16 is sufficient)
trieTasks map[common.Hash]*trieTask // Trie node download tasks to track previous attempts
trieTasks map[string]*trieTask // Trie node download tasks to track previous attempts
codeTasks map[common.Hash]*codeTask // Byte code download tasks to track previous attempts
timeout time.Duration // Maximum round trip time for this to complete
timer *time.Timer // Timer to fire when the RTT timeout expires
Expand Down Expand Up @@ -264,7 +264,7 @@ type stateSync struct {
sched *trie.Sync // State trie sync scheduler defining the tasks
keccak crypto.KeccakState // Keccak256 hasher to verify deliveries with

trieTasks map[common.Hash]*trieTask // Set of trie node tasks currently queued for retrieval
trieTasks map[string]*trieTask // Set of trie node tasks currently queued for retrieval
codeTasks map[common.Hash]*codeTask // Set of byte code tasks currently queued for retrieval

numUncommitted int
Expand All @@ -282,6 +282,7 @@ type stateSync struct {
// trieTask represents a single trie node download task, containing a set of
// peers already attempted retrieval from to detect stalled syncs and abort.
type trieTask struct {
hash common.Hash
path [][]byte
attempts map[string]struct{}
}
Expand All @@ -300,7 +301,7 @@ func newStateSync(d *Downloader, root common.Hash) *stateSync {
root: root,
sched: state.NewStateSync(root, d.stateDB, d.stateBloom, nil),
keccak: sha3.NewLegacyKeccak256().(crypto.KeccakState),
trieTasks: make(map[common.Hash]*trieTask),
trieTasks: make(map[string]*trieTask),
codeTasks: make(map[common.Hash]*codeTask),
deliver: make(chan *stateReq),
cancel: make(chan struct{}),
Expand Down Expand Up @@ -456,10 +457,11 @@ func (s *stateSync) assignTasks() {
func (s *stateSync) fillTasks(n int, req *stateReq) (nodes []common.Hash, paths []trie.SyncPath, codes []common.Hash) {
// Refill available tasks from the scheduler.
if fill := n - (len(s.trieTasks) + len(s.codeTasks)); fill > 0 {
nodes, paths, codes := s.sched.Missing(fill)
for i, hash := range nodes {
s.trieTasks[hash] = &trieTask{
path: paths[i],
paths, hashes, codes := s.sched.Missing(fill)
for i, path := range paths {
s.trieTasks[path] = &trieTask{
hash: hashes[i],
path: trie.NewSyncPath([]byte(path)),
attempts: make(map[string]struct{}),
}
}
Expand All @@ -475,7 +477,7 @@ func (s *stateSync) fillTasks(n int, req *stateReq) (nodes []common.Hash, paths
paths = make([]trie.SyncPath, 0, n)
codes = make([]common.Hash, 0, n)

req.trieTasks = make(map[common.Hash]*trieTask, n)
req.trieTasks = make(map[string]*trieTask, n)
req.codeTasks = make(map[common.Hash]*codeTask, n)

for hash, t := range s.codeTasks {
Expand All @@ -493,7 +495,7 @@ func (s *stateSync) fillTasks(n int, req *stateReq) (nodes []common.Hash, paths
req.codeTasks[hash] = t
delete(s.codeTasks, hash)
}
for hash, t := range s.trieTasks {
for path, t := range s.trieTasks {
// Stop when we've gathered enough requests
if len(nodes)+len(codes) == n {
break
Expand All @@ -505,11 +507,11 @@ func (s *stateSync) fillTasks(n int, req *stateReq) (nodes []common.Hash, paths
// Assign the request to this peer
t.attempts[req.peer.id] = struct{}{}

nodes = append(nodes, hash)
nodes = append(nodes, t.hash)
paths = append(paths, t.path)

req.trieTasks[hash] = t
delete(s.trieTasks, hash)
req.trieTasks[path] = t
delete(s.trieTasks, path)
}
req.nItems = uint16(len(nodes) + len(codes))
return nodes, paths, codes
Expand All @@ -531,7 +533,7 @@ func (s *stateSync) process(req *stateReq) (int, error) {

// Iterate over all the delivered data and inject one-by-one into the trie
for _, blob := range req.response {
hash, err := s.processNodeData(blob)
hash, err := s.processNodeData(req.trieTasks, req.codeTasks, blob)
switch err {
case nil:
s.numUncommitted++
Expand All @@ -544,13 +546,10 @@ func (s *stateSync) process(req *stateReq) (int, error) {
default:
return successful, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err)
}
// Delete from both queues (one delivery is enough for the syncer)
delete(req.trieTasks, hash)
delete(req.codeTasks, hash)
}
// Put unfulfilled tasks back into the retry queue
npeers := s.d.peers.Len()
for hash, task := range req.trieTasks {
for path, task := range req.trieTasks {
// If the node did deliver something, missing items may be due to a protocol
// limit or a previous timeout + delayed delivery. Both cases should permit
// the node to retry the missing items (to avoid single-peer stalls).
Expand All @@ -560,10 +559,10 @@ func (s *stateSync) process(req *stateReq) (int, error) {
// If we've requested the node too many times already, it may be a malicious
// sync where nobody has the right data. Abort.
if len(task.attempts) >= npeers {
return successful, fmt.Errorf("trie node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers)
return successful, fmt.Errorf("trie node %s failed with all peers (%d tries, %d peers)", task.hash.TerminalString(), len(task.attempts), npeers)
}
// Missing item, place into the retry queue.
s.trieTasks[hash] = task
s.trieTasks[path] = task
}
for hash, task := range req.codeTasks {
// If the node did deliver something, missing items may be due to a protocol
Expand All @@ -586,13 +585,35 @@ func (s *stateSync) process(req *stateReq) (int, error) {
// processNodeData tries to inject a trie node data blob delivered from a remote
// peer into the state trie, returning whether anything useful was written or any
// error occurred.
func (s *stateSync) processNodeData(blob []byte) (common.Hash, error) {
res := trie.SyncResult{Data: blob}
//
// If multiple requests correspond to the same hash, this method will inject the
// blob as a result for the first one only, leaving the remaining duplicates to
// be fetched again.
func (s *stateSync) processNodeData(nodeTasks map[string]*trieTask, codeTasks map[common.Hash]*codeTask, blob []byte) (common.Hash, error) {
var hash common.Hash
s.keccak.Reset()
s.keccak.Write(blob)
s.keccak.Read(res.Hash[:])
err := s.sched.Process(res)
return res.Hash, err
s.keccak.Read(hash[:])

if _, present := codeTasks[hash]; present {
err := s.sched.ProcessCode(trie.CodeSyncResult{
Hash: hash,
Data: blob,
})
delete(codeTasks, hash)
return hash, err
}
for path, task := range nodeTasks {
if task.hash == hash {
err := s.sched.ProcessNode(trie.NodeSyncResult{
Path: path,
Data: blob,
})
delete(nodeTasks, path)
return hash, err
}
}
return common.Hash{}, trie.ErrNotRequested
}

// updateStats bumps the various state sync progress counters and displays a log
Expand Down

0 comments on commit e65dbfa

Please sign in to comment.