Skip to content

Commit

Permalink
Merge pull request #5350 from GFZRZK/GFZRZK/optmize/avoid_race_in_mpo…
Browse files Browse the repository at this point in the history
…ol_cfg

avoid use mp.cfg directly to avoid race
  • Loading branch information
magik6k authored Jan 30, 2021
2 parents 5cfae0f + ff62b64 commit 7c7301f
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 12 deletions.
10 changes: 7 additions & 3 deletions chain/messagepool/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,13 @@ func saveConfig(cfg *types.MpoolConfig, ds dtypes.MetadataDS) error {
}

func (mp *MessagePool) GetConfig() *types.MpoolConfig {
mp.cfgLk.Lock()
defer mp.cfgLk.Unlock()
return mp.cfg.Clone()
return mp.getConfig().Clone()
}

func (mp *MessagePool) getConfig() *types.MpoolConfig {
mp.cfgLk.RLock()
defer mp.cfgLk.RUnlock()
return mp.cfg
}

func validateConfg(cfg *types.MpoolConfig) error {
Expand Down
4 changes: 2 additions & 2 deletions chain/messagepool/messagepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ type MessagePool struct {
curTsLk sync.Mutex // DO NOT LOCK INSIDE lk
curTs *types.TipSet

cfgLk sync.Mutex
cfgLk sync.RWMutex
cfg *types.MpoolConfig

api Provider
Expand Down Expand Up @@ -781,7 +781,7 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage, strict, untrusted bool)

if incr {
mp.currentSize++
if mp.currentSize > mp.cfg.SizeLimitHigh {
if mp.currentSize > mp.getConfig().SizeLimitHigh {
// send signal to prune messages if it hasnt already been sent
select {
case mp.pruneTrigger <- struct{}{}:
Expand Down
10 changes: 6 additions & 4 deletions chain/messagepool/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ func (mp *MessagePool) pruneExcessMessages() error {
mp.lk.Lock()
defer mp.lk.Unlock()

if mp.currentSize < mp.cfg.SizeLimitHigh {
mpCfg := mp.getConfig()
if mp.currentSize < mpCfg.SizeLimitHigh {
return nil
}

select {
case <-mp.pruneCooldown:
err := mp.pruneMessages(context.TODO(), ts)
go func() {
time.Sleep(mp.cfg.PruneCooldown)
time.Sleep(mpCfg.PruneCooldown)
mp.pruneCooldown <- struct{}{}
}()
return err
Expand All @@ -53,8 +54,9 @@ func (mp *MessagePool) pruneMessages(ctx context.Context, ts *types.TipSet) erro
// protected actors -- not pruned
protected := make(map[address.Address]struct{})

mpCfg := mp.getConfig()
// we never prune priority addresses
for _, actor := range mp.cfg.PriorityAddrs {
for _, actor := range mpCfg.PriorityAddrs {
protected[actor] = struct{}{}
}

Expand Down Expand Up @@ -90,7 +92,7 @@ func (mp *MessagePool) pruneMessages(ctx context.Context, ts *types.TipSet) erro
})

// Keep messages (remove them from pruneMsgs) from chains while we are under the low water mark
loWaterMark := mp.cfg.SizeLimitLow
loWaterMark := mpCfg.SizeLimitLow
keepLoop:
for _, chain := range chains {
for _, m := range chain.msgs {
Expand Down
6 changes: 3 additions & 3 deletions chain/messagepool/selection.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,14 +532,14 @@ func (mp *MessagePool) selectPriorityMessages(pending map[address.Address]map[ui
log.Infow("select priority messages done", "took", dt)
}
}()

result := make([]*types.SignedMessage, 0, mp.cfg.SizeLimitLow)
mpCfg := mp.getConfig()
result := make([]*types.SignedMessage, 0, mpCfg.SizeLimitLow)
gasLimit := int64(build.BlockGasLimit)
minGas := int64(gasguess.MinGas)

// 1. Get priority actor chains
var chains []*msgChain
priority := mp.cfg.PriorityAddrs
priority := mpCfg.PriorityAddrs
for _, actor := range priority {
mset, ok := pending[actor]
if ok {
Expand Down

0 comments on commit 7c7301f

Please sign in to comment.