Skip to content

Commit

Permalink
Merge pull request #71 from CortexFoundation/ucwong
Browse files Browse the repository at this point in the history
private pr
  • Loading branch information
ucwong authored May 9, 2019
2 parents ed8f384 + e63a139 commit 5c39421
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 28 deletions.
6 changes: 4 additions & 2 deletions common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ const (
var (
hashT = reflect.TypeOf(Hash{})
addressT = reflect.TypeOf(Address{})
EmptyHash = HexToHash("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")
EmptyAddress = HexToAddress("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")
//EmptyHash = HexToHash("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")
EmptyHash = HexToHash("0000000000000000000000000000000000000000000000000000000000000000")
// EmptyAddress = HexToAddress("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")
EmptyAddress = HexToAddress("0000000000000000000000000000000000000000000000000000000000000000")
)

// Hash represents the 32 byte Keccak256 hash of arbitrary data.
Expand Down
87 changes: 62 additions & 25 deletions torrentfs/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,18 +351,14 @@ func (m *Monitor) startWork() error {
}

func (m *Monitor) validateStorage() error {
if m.fs.LastListenBlockNumber > 2048 {
m.lastNumber = m.fs.LastListenBlockNumber - 2048
} else {
m.lastNumber = uint64(0) //m.fs.LastListenBlockNumber
}
m.lastNumber = m.fs.LastListenBlockNumber
end := uint64(0)

if m.lastNumber > 4096 {
end = m.lastNumber - 4096
if m.lastNumber > 2048 {
end = m.lastNumber - 2048
}

log.Info("Validate Torrent FS Storage", "last IPC listen number", m.lastNumber, "end", end)
log.Info("Validate Torrent FS Storage", "last IPC listen number", m.lastNumber, "end", end, "lastest", m.fs.LastListenBlockNumber)

for i := m.lastNumber; i > end; i-- {
rpcBlock, rpcErr := m.rpcBlockByNumber(uint64(i))
Expand All @@ -371,24 +367,30 @@ func (m *Monitor) validateStorage() error {
return rpcErr
}

if rpcBlock == nil || rpcBlock.Hash == common.EmptyHash {
log.Warn("No block found", "number", i)
m.lastNumber = uint64(0)
return nil
}

stBlock := m.fs.GetBlockByNumber(uint64(i))
if stBlock == nil {
log.Warn("Vaidate Torrent FS Storage state invalid", "number", m.lastNumber, "error", "LastListenBlockNumber not persistent")
//return nil
continue
log.Warn("Vaidate Torrent FS Storage state failed, rescan", "number", m.lastNumber, "error", "LastListenBlockNumber not persistent", "dirty", m.fs.LastListenBlockNumber)
m.lastNumber = uint64(0)
return nil
}

if rpcBlock.Hash.Hex() == stBlock.Hash.Hex() {
log.Warn("Validate TFS failed", "number", m.lastNumber, "rpc", rpcBlock.Hash.Hex(), "store", stBlock.Hash.Hex())
//return nil
//log.Warn("Validate TFS continue", "number", m.lastNumber, "rpc", rpcBlock.Hash.Hex(), "store", stBlock.Hash.Hex())
continue
}

// block in storage invalid
log.Info("Update invalid block in storage", "old hash", stBlock.Hash, "new hash", rpcBlock.Hash)
m.fs.WriteBlock(rpcBlock)
log.Info("Update invalid block in storage", "old hash", stBlock.Hash, "new hash", rpcBlock.Hash, "latest", m.fs.LastListenBlockNumber)
m.lastNumber = uint64(0)
return nil
}
log.Info("Validate Torrent FS Storage ended", "last IPC listen number", m.lastNumber, "end", end)
log.Info("Validate Torrent FS Storage ended", "last IPC listen number", m.lastNumber, "end", end, "latest", m.fs.LastListenBlockNumber)

return nil
}
Expand Down Expand Up @@ -494,29 +496,51 @@ func (m *Monitor) syncLastBlock() {
break
}

rpcBlock, rpcErr := m.rpcBlockByNumber(i)
if rpcErr != nil {
log.Error("Sync old block", "number", i, "error", rpcErr)
return
}

block := m.fs.GetBlockByNumber(i)
if block == nil {
rpcBlock, rpcErr := m.rpcBlockByNumber(i)
if rpcErr != nil {
log.Error("Sync old block", "number", i, "error", rpcErr)
return
}
//rpcBlock, rpcErr := m.rpcBlockByNumber(i)
//if rpcErr != nil {
// log.Error("Sync old block", "number", i, "error", rpcErr)
// return
//}

block = rpcBlock

if parseErr := m.parseBlockTorrentInfo(block, true); parseErr != nil {
/*if parseErr := m.parseBlockTorrentInfo(block, true); parseErr != nil {
log.Error("Parse new block", "number", i, "block", block, "error", parseErr)
return
}
if storeErr := m.fs.WriteBlock(block); storeErr != nil {
log.Error("Store latest block", "number", i, "error", storeErr)
return
}*/

if err := m.parseAndStore(block, true); err != nil {
log.Error("Fail to parse and storge latest block", "number", i, "error", err)
return
}

} else if parseErr := m.parseBlockTorrentInfo(block, false); parseErr != nil {
log.Error("Parse old block", "number", i, "block", block, "error", parseErr)
return
} else {
if block.Hash.Hex() == rpcBlock.Hash.Hex() {

if parseErr := m.parseBlockTorrentInfo(block, false); parseErr != nil { //dirty to do
log.Error("Parse old block", "number", i, "block", block, "error", parseErr)
return
}
} else {
//dirty tfs
if err := m.parseAndStore(rpcBlock, true); err != nil {
log.Error("Dirty tfs fail to parse and storge latest block", "number", i, "error", err)
return
}
}
}

//if (i-minNumber)%fetchBlockLogStep == 0 || i == maxNumber {
Expand All @@ -527,3 +551,16 @@ func (m *Monitor) syncLastBlock() {
m.lastNumber = maxNumber
log.Info("Torrent scan finished", "from", minNumber, "to", maxNumber, "current", uint64(currentNumber), "progress", float64(maxNumber)/float64(currentNumber), "last", m.lastNumber)
}

func (m *Monitor) parseAndStore(block *Block, flow bool) error {
if parseErr := m.parseBlockTorrentInfo(block, flow); parseErr != nil {
log.Error("Parse new block", "number", block.Number, "block", block, "error", parseErr)
return parseErr
}

if storeErr := m.fs.WriteBlock(block); storeErr != nil {
log.Error("Store latest block", "number", block.Number, "error", storeErr)
return storeErr
}
return nil
}
3 changes: 2 additions & 1 deletion torrentfs/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,8 @@ func (fs *FileStorage) WriteBlock(b *Block) error {
return e
})

if err == nil && b.Number > fs.LastListenBlockNumber {
//if err == nil && b.Number > fs.LastListenBlockNumber {
if err == nil {
fs.bnLock.Lock()
fs.LastListenBlockNumber = b.Number
fs.writeBlockNumber()
Expand Down

0 comments on commit 5c39421

Please sign in to comment.