Skip to content

Commit

Permalink
Merge pull request #72 from CortexFoundation/ucwong
Browse files Browse the repository at this point in the history
safe starting tfs
  • Loading branch information
ucwong authored May 10, 2019
2 parents 5c39421 + d9691da commit 7d98259
Showing 1 changed file with 46 additions and 10 deletions.
56 changes: 46 additions & 10 deletions torrentfs/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type Monitor struct {
exitCh chan struct{}
terminated int32
lastNumber uint64
dirty bool
}

// NewMonitor creates a new instance of monitor.
Expand Down Expand Up @@ -93,6 +94,7 @@ func NewMonitor(flag *Config) (*Monitor, error) {
exitCh: make(chan struct{}),
terminated: 0,
lastNumber: uint64(0),
dirty: false,
}, nil
}

Expand Down Expand Up @@ -328,14 +330,33 @@ func (m *Monitor) startWork() error {

rpcClient, rpcErr := SetConnection(clientURI)
if rpcErr != nil {
log.Warn("Torrent rpc client is wrong", "uri", clientURI)
log.Error("Torrent rpc client is wrong", "uri", clientURI, "error", rpcErr)
return rpcErr
}
m.cl = rpcClient

if vaErr := m.validateStorage(); vaErr != nil {
log.Warn("Torrent invalid storage")
errCh := make(chan error)

/*if vaErr := m.validateStorage(); vaErr != nil {
log.Error("Torrent invalid storage", "error", vaErr)
return vaErr
}*/
go m.validateStorage(errCh)

for {
select {
case err := <-errCh:
if err != nil {
//return err
log.Warn("Starting torrent fs ... ...", "error", err)
go m.validateStorage(errCh)
} else {
log.Info("Torrent fs validation passed")
//break
go m.listenLatestBlock()
return nil
}
}
}

// Used for listen latest block
Expand All @@ -345,17 +366,18 @@ func (m *Monitor) startWork() error {
//}

//go m.syncLastBlock()
go m.listenLatestBlock()
//go m.listenLatestBlock()
log.Warn("... ... ...")

return nil
}

func (m *Monitor) validateStorage() error {
func (m *Monitor) validateStorage(errCh chan error) error {
m.lastNumber = m.fs.LastListenBlockNumber
end := uint64(0)

if m.lastNumber > 2048 {
end = m.lastNumber - 2048
if m.lastNumber > 4096 {
end = m.lastNumber - 4096
}

log.Info("Validate Torrent FS Storage", "last IPC listen number", m.lastNumber, "end", end, "lastest", m.fs.LastListenBlockNumber)
Expand All @@ -364,19 +386,26 @@ func (m *Monitor) validateStorage() error {
rpcBlock, rpcErr := m.rpcBlockByNumber(uint64(i))
if rpcErr != nil {
log.Warn("RPC ERROR", "error", rpcErr)
errCh <- rpcErr
return rpcErr
}

if rpcBlock == nil || rpcBlock.Hash == common.EmptyHash {
log.Warn("No block found", "number", i)
m.lastNumber = uint64(0)
return nil
m.lastNumber = uint64(i)
//errCh <- nil
//return nil
m.dirty = true
continue
} else {
m.dirty = false
}

stBlock := m.fs.GetBlockByNumber(uint64(i))
if stBlock == nil {
log.Warn("Vaidate Torrent FS Storage state failed, rescan", "number", m.lastNumber, "error", "LastListenBlockNumber not persistent", "dirty", m.fs.LastListenBlockNumber)
m.lastNumber = uint64(0)
errCh <- nil
return nil
}

Expand All @@ -388,10 +417,17 @@ func (m *Monitor) validateStorage() error {
// block in storage invalid
log.Info("Update invalid block in storage", "old hash", stBlock.Hash, "new hash", rpcBlock.Hash, "latest", m.fs.LastListenBlockNumber)
m.lastNumber = uint64(0)
errCh <- nil
return nil
}
log.Info("Validate Torrent FS Storage ended", "last IPC listen number", m.lastNumber, "end", end, "latest", m.fs.LastListenBlockNumber)

if m.dirty {
log.Warn("Torrent fs status", "dirty", m.dirty)
m.lastNumber = uint64(0)
}

errCh <- nil
return nil
}

Expand Down Expand Up @@ -438,7 +474,7 @@ func (m *Monitor) listenLatestBlock() {
//go m.syncLastBlock()
m.syncLastBlock()
// Aviod sync in full mode, fresh interval may be less.
timer.Reset(time.Millisecond * 10)
timer.Reset(time.Millisecond * 1000)

case <-m.exitCh:
return
Expand Down

0 comments on commit 7d98259

Please sign in to comment.