Skip to content

Commit

Permalink
les: restored setConnectedBias function
Browse files Browse the repository at this point in the history
  • Loading branch information
zsfelfoldi committed Aug 10, 2020
1 parent 35f26f9 commit 1b0b619
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 27 deletions.
5 changes: 2 additions & 3 deletions les/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,13 @@ func (api *PrivateLightServerAPI) SetDefaultParams(params map[string]interface{}
// So that already connected client won't be kicked out very soon and we can ensure all
// connected clients can have enough time to request or sync some data.
// When the input parameter `bias` < 0 (illegal), return error.
/*func (api *PrivateLightServerAPI) SetConnectedBias(bias time.Duration) error {
func (api *PrivateLightServerAPI) SetConnectedBias(bias time.Duration) error {
if bias < time.Duration(0) {
return fmt.Errorf("bias illegal: %v less than 0", bias)
}
api.server.clientPool.setConnectedBias(bias)
return nil
}*/
//TODO fix regression
}

// AddBalance updates the balance of a client (either overwrites it or adds to it).
// It also updates the balance meta info string.
Expand Down
25 changes: 19 additions & 6 deletions les/clientpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ const (
defaultPosExpTC = 36000 // default time constant (in seconds) for exponentially reducing positive balance
defaultNegExpTC = 3600 // default time constant (in seconds) for exponentially reducing negative balance

// activeBias is applied to already connected clients So that
// defaultConnectedBias is applied to already connected clients So that
// already connected client won't be kicked out very soon and we
// can ensure all connected clients can have enough time to request
// or sync some data.
//
// todo(rjl493456442) make it configurable. It can be the option of
// free trial time!
activeBias = time.Minute * 3
inactiveTimeout = time.Second * 10
defaultConnectedBias = time.Minute * 3
inactiveTimeout = time.Second * 10
)

var (
Expand Down Expand Up @@ -93,6 +93,7 @@ type clientPool struct {
defaultPosFactors, defaultNegFactors lps.PriceFactors
posExpTC, negExpTC uint64
minCap uint64 // The minimal capacity value allowed for any client
connectedBias time.Duration
capLimit uint64
freeClientCap uint64 // The capacity value of each free client
}
Expand Down Expand Up @@ -121,7 +122,7 @@ type clientInfo struct {
}

// newClientPool creates a new client pool
func newClientPool(lespayDb ethdb.Database, minCap, freeClientCap uint64, activeBias time.Duration, clock mclock.Clock, removePeer func(enode.ID)) *clientPool {
func newClientPool(lespayDb ethdb.Database, minCap, freeClientCap uint64, connectedBias time.Duration, clock mclock.Clock, removePeer func(enode.ID)) *clientPool {
if minCap > freeClientCap {
panic(nil)
}
Expand All @@ -132,11 +133,12 @@ func newClientPool(lespayDb ethdb.Database, minCap, freeClientCap uint64, active
PriorityPoolSetup: priorityPoolSetup,
clock: clock,
minCap: minCap,
connectedBias: connectedBias,
freeClientCap: freeClientCap,
removePeer: removePeer,
}
pool.bt = lps.NewBalanceTracker(ns, balanceTrackerSetup, lespayDb, clock, &utils.Expirer{}, &utils.Expirer{})
pool.pp = lps.NewPriorityPool(ns, priorityPoolSetup, clock, minCap, activeBias, 4)
pool.pp = lps.NewPriorityPool(ns, priorityPoolSetup, clock, minCap, connectedBias, 4)

// set default expiration constants used by tests
// Note: server overwrites this if token sale is active
Expand Down Expand Up @@ -242,7 +244,7 @@ func (f *clientPool) connect(peer clientPoolPeer, reqCapacity uint64) (uint64, e
}

f.ns.SetState(node, f.InactiveFlag, nodestate.Flags{}, 0)
if _, allowed := f.pp.RequestCapacity(node, reqCapacity, activeBias, true); allowed {
if _, allowed := f.pp.RequestCapacity(node, reqCapacity, f.connectedBias, true); allowed {
return reqCapacity, nil
}
if !peer.allowInactive() {
Expand All @@ -251,6 +253,17 @@ func (f *clientPool) connect(peer clientPoolPeer, reqCapacity uint64) (uint64, e
return 0, nil
}

// setConnectedBias sets the connection bias, which is applied to already connected clients
// So that already connected client won't be kicked out very soon and we can ensure all
// connected clients can have enough time to request or sync some data.
func (f *clientPool) setConnectedBias(bias time.Duration) {
f.lock.Lock()
defer f.lock.Unlock()

f.connectedBias = bias
f.pp.SetActiveBias(bias)
}

// disconnect should be called when a connection is terminated. If the disconnection
// was initiated by the pool itself using disconnectFn then calling disconnect is
// not necessary but permitted.
Expand Down
24 changes: 12 additions & 12 deletions les/clientpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func TestConnectPaidClient(t *testing.T) {
clock mclock.Simulated
db = rawdb.NewMemoryDatabase()
)
pool := newClientPool(db, 1, 1, activeBias, &clock, func(id enode.ID) {})
pool := newClientPool(db, 1, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
defer pool.stop()
pool.setLimits(10, uint64(10))
pool.setDefaultFactors(lps.PriceFactors{1, 0, 1}, lps.PriceFactors{1, 0, 1})
Expand All @@ -236,7 +236,7 @@ func TestConnectPaidClientToSmallPool(t *testing.T) {
clock mclock.Simulated
db = rawdb.NewMemoryDatabase()
)
pool := newClientPool(db, 1, 1, activeBias, &clock, func(id enode.ID) {})
pool := newClientPool(db, 1, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
defer pool.stop()
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
pool.setDefaultFactors(lps.PriceFactors{1, 0, 1}, lps.PriceFactors{1, 0, 1})
Expand All @@ -256,7 +256,7 @@ func TestConnectPaidClientToFullPool(t *testing.T) {
db = rawdb.NewMemoryDatabase()
)
removeFn := func(enode.ID) {} // Noop
pool := newClientPool(db, 1, 1, activeBias, &clock, removeFn)
pool := newClientPool(db, 1, 1, defaultConnectedBias, &clock, removeFn)
defer pool.stop()
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
pool.setDefaultFactors(lps.PriceFactors{1, 0, 1}, lps.PriceFactors{1, 0, 1})
Expand Down Expand Up @@ -285,7 +285,7 @@ func TestPaidClientKickedOut(t *testing.T) {
removeFn := func(id enode.ID) {
kickedCh <- int(id[0])
}
pool := newClientPool(db, 1, 1, activeBias, &clock, removeFn)
pool := newClientPool(db, 1, 1, defaultConnectedBias, &clock, removeFn)
pool.bt.SetExpirationTCs(0, 0)
defer pool.stop()
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
Expand All @@ -296,7 +296,7 @@ func TestPaidClientKickedOut(t *testing.T) {
pool.connect(newPoolTestPeer(i, kickedCh), 1)
clock.Run(time.Millisecond)
}
clock.Run(activeBias + time.Second*11)
clock.Run(defaultConnectedBias + time.Second*11)
if cap, _ := pool.connect(newPoolTestPeer(11, kickedCh), 0); cap == 0 {
t.Fatalf("Free client should be accepted")
}
Expand All @@ -315,7 +315,7 @@ func TestConnectFreeClient(t *testing.T) {
clock mclock.Simulated
db = rawdb.NewMemoryDatabase()
)
pool := newClientPool(db, 1, 1, activeBias, &clock, func(id enode.ID) {})
pool := newClientPool(db, 1, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
defer pool.stop()
pool.setLimits(10, uint64(10))
pool.setDefaultFactors(lps.PriceFactors{1, 0, 1}, lps.PriceFactors{1, 0, 1})
Expand All @@ -333,7 +333,7 @@ func TestConnectFreeClientToFullPool(t *testing.T) {
db = rawdb.NewMemoryDatabase()
)
removeFn := func(enode.ID) {} // Noop
pool := newClientPool(db, 1, 1, activeBias, &clock, removeFn)
pool := newClientPool(db, 1, 1, defaultConnectedBias, &clock, removeFn)
defer pool.stop()
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
pool.setDefaultFactors(lps.PriceFactors{1, 0, 1}, lps.PriceFactors{1, 0, 1})
Expand Down Expand Up @@ -362,7 +362,7 @@ func TestFreeClientKickedOut(t *testing.T) {
kicked = make(chan int, 100)
)
removeFn := func(id enode.ID) { kicked <- int(id[0]) }
pool := newClientPool(db, 1, 1, activeBias, &clock, removeFn)
pool := newClientPool(db, 1, 1, defaultConnectedBias, &clock, removeFn)
defer pool.stop()
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
pool.setDefaultFactors(lps.PriceFactors{1, 0, 1}, lps.PriceFactors{1, 0, 1})
Expand Down Expand Up @@ -403,7 +403,7 @@ func TestPositiveBalanceCalculation(t *testing.T) {
kicked = make(chan int, 10)
)
removeFn := func(id enode.ID) { kicked <- int(id[0]) } // Noop
pool := newClientPool(db, 1, 1, activeBias, &clock, removeFn)
pool := newClientPool(db, 1, 1, defaultConnectedBias, &clock, removeFn)
defer pool.stop()
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
pool.setDefaultFactors(lps.PriceFactors{1, 0, 1}, lps.PriceFactors{1, 0, 1})
Expand All @@ -426,7 +426,7 @@ func TestDowngradePriorityClient(t *testing.T) {
kicked = make(chan int, 10)
)
removeFn := func(id enode.ID) { kicked <- int(id[0]) } // Noop
pool := newClientPool(db, 1, 1, activeBias, &clock, removeFn)
pool := newClientPool(db, 1, 1, defaultConnectedBias, &clock, removeFn)
defer pool.stop()
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
pool.setDefaultFactors(lps.PriceFactors{1, 0, 1}, lps.PriceFactors{1, 0, 1})
Expand Down Expand Up @@ -460,7 +460,7 @@ func TestNegativeBalanceCalculation(t *testing.T) {
clock mclock.Simulated
db = rawdb.NewMemoryDatabase()
)
pool := newClientPool(db, 1, 1, activeBias, &clock, func(id enode.ID) {})
pool := newClientPool(db, 1, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
defer pool.stop()
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
pool.setDefaultFactors(lps.PriceFactors{1e-3, 0, 1}, lps.PriceFactors{1e-3, 0, 1})
Expand Down Expand Up @@ -495,7 +495,7 @@ func TestInactiveClient(t *testing.T) {
clock mclock.Simulated
db = rawdb.NewMemoryDatabase()
)
pool := newClientPool(db, 1, 1, activeBias, &clock, func(id enode.ID) {})
pool := newClientPool(db, 1, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
defer pool.stop()
pool.setLimits(2, uint64(2))

Expand Down
17 changes: 13 additions & 4 deletions les/lespay/server/prioritypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ type PriorityPool struct {
activeCount, activeCap uint64
maxCount, maxCap uint64
minCap uint64
minBias time.Duration
activeBias time.Duration
capacityStepDiv uint64
}

Expand All @@ -126,15 +126,15 @@ type ppNodeInfo struct {
}

// NewPriorityPool creates a new PriorityPool
func NewPriorityPool(ns *nodestate.NodeStateMachine, setup PriorityPoolSetup, clock mclock.Clock, minCap uint64, minBias time.Duration, capacityStepDiv uint64) *PriorityPool {
func NewPriorityPool(ns *nodestate.NodeStateMachine, setup PriorityPoolSetup, clock mclock.Clock, minCap uint64, activeBias time.Duration, capacityStepDiv uint64) *PriorityPool {
pp := &PriorityPool{
ns: ns,
PriorityPoolSetup: setup,
clock: clock,
activeQueue: prque.NewLazyQueue(activeSetIndex, activePriority, activeMaxPriority, clock, lazyQueueRefresh),
inactiveQueue: prque.New(inactiveSetIndex),
minCap: minCap,
minBias: minBias,
activeBias: activeBias,
capacityStepDiv: capacityStepDiv,
}

Expand Down Expand Up @@ -246,6 +246,15 @@ func (pp *PriorityPool) SetLimits(maxCount, maxCap uint64) {
}
}

// SetActiveBias sets the bias applied when trying to activate inactive nodes
func (pp *PriorityPool) SetActiveBias(bias time.Duration) {
pp.lock.Lock()
defer pp.lock.Unlock()

pp.activeBias = bias
pp.tryActivate()
}

// ActiveCapacity returns the total capacity of currently active nodes
func (pp *PriorityPool) ActiveCapacity() uint64 {
pp.lock.Lock()
Expand Down Expand Up @@ -452,7 +461,7 @@ func (pp *PriorityPool) tryActivate() []capUpdate {
c := pp.inactiveQueue.PopItem().(*ppNodeInfo)
pp.markForChange(c)
pp.setCapacity(c, pp.minCap)
c.bias = pp.minBias
c.bias = pp.activeBias
pp.activeQueue.Push(c)
pp.enforceLimits()
if c.capacity > 0 {
Expand Down
2 changes: 1 addition & 1 deletion les/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func NewLesServer(node *node.Node, e *eth.Ethereum, config *eth.Config) (*LesSer
srv.maxCapacity = totalRecharge
}
srv.fcManager.SetCapacityLimits(srv.freeCapacity, srv.maxCapacity, srv.freeCapacity*2)
srv.clientPool = newClientPool(srv.chainDb, srv.minCapacity, srv.freeCapacity, activeBias, mclock.System{}, func(id enode.ID) { go srv.peers.disconnect(id.String()) })
srv.clientPool = newClientPool(srv.chainDb, srv.minCapacity, srv.freeCapacity, defaultConnectedBias, mclock.System{}, func(id enode.ID) { go srv.peers.disconnect(id.String()) })
srv.clientPool.setDefaultFactors(lps.PriceFactors{0, 1, 1}, lps.PriceFactors{0, 1, 1})

checkpoint := srv.latestLocalCheckpoint()
Expand Down
2 changes: 1 addition & 1 deletion les/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func newTestServerHandler(blocks int, indexers []*core.ChainIndexer, db ethdb.Da
}
server.costTracker, server.freeCapacity = newCostTracker(db, server.config)
server.costTracker.testCostList = testCostList(0) // Disable flow control mechanism.
server.clientPool = newClientPool(db, testBufRecharge, testBufRecharge, activeBias, clock, func(id enode.ID) {})
server.clientPool = newClientPool(db, testBufRecharge, testBufRecharge, defaultConnectedBias, clock, func(id enode.ID) {})
server.clientPool.setLimits(10000, 10000) // Assign enough capacity for clientpool
server.handler = newServerHandler(server, simulation.Blockchain(), db, txpool, func() bool { return true })
if server.oracle != nil {
Expand Down

0 comments on commit 1b0b619

Please sign in to comment.